大纲
1.并发安全的数组列表CopyOnWriteArrayList
2.并发安全的链表队列ConcurrentLinkedQueue
3.并发编程中的阻塞队列概述
4.JUC的各种阻塞队列介绍
5.LinkedBlockingQueue的具体实现原理
6.基于两个队列实现的集群同步机制
1.并发安全的数组列表CopyOnWriteArrayList
(1)CopyOnWriteArrayList的初始化
(2)基于锁 + 写时复制机制实现的增删改操作
(3)使用写时复制的原因是读操作不加锁 + 不使用Unsafe读取数组元素
(4)对数组进行迭代时采用了副本快照机制
(5)核心思想是通过弱一致性提升读并发
(6)写时复制的总结
(1)CopyOnWriteArrayList的初始化
并发安全的HashMap是ConcurrentHashMap
并发安全的ArrayList是CopyOnWriteArrayList
并发安全的LinkedList是ConcurrentLinkedQueue
从CopyOnWriteArrayList的构造方法可知,CopyOnWriteArrayList基于Object对象数组实现。
这个Object对象数组array会使用volatile修饰,保证了多线程下的可见性。只要有一个线程修改了数组array,其他线程可以马上读取到最新值。
//A thread-safe variant of java.util.ArrayList in which all mutative operations //(add, set, and so on) are implemented by making a fresh copy of the underlying array. public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { ... //The lock protecting all mutators final transient ReentrantLock lock = new ReentrantLock(); //The array, accessed only via getArray/setArray. private transient volatile Object[] array; //Creates an empty list. public CopyOnWriteArrayList() { setArray(new Object[0]); } //Sets the array. final void setArray(Object[] a) { array = a; } ... }
(2)基于锁 + 写时复制机制实现的增删改操作
一.使用独占锁解决对数组的写写并发问题
每个CopyOnWriteArrayList都有一个Object数组 + 一个ReentrantLock锁。在对Object数组进行增删改时,都要先获取锁,保证只有一个线程增删改。从而确保多线程增删改CopyOnWriteArrayList的Object数组是并发安全的。注意:获取锁的动作需要在执行getArray()方法前执行。
但因为获取独占锁,所以导致CopyOnWriteArrayList的写并发并性能不太好。而ConcurrentHashMap由于通过CAS设置 + 分段加锁,所以写并发性能很高。
二.使用写时复制机制解决对数组的读写并发问题
CopyOnWrite就是写时复制。写数据时不直接在当前数组里写,而是先把当前数组的数据复制到新数组里。然后再在新数组里写数据,写完数据后再将新数组赋值给array变量。这样原数组由于没有了array变量的引用,很快就会被JVM回收掉。
其中会使用System.arraycopy()方法和Arrays.copyOf()方法来复制数据到新数组,从Arrays.copyOf(elements, len + 1)可知,新数组的大小比原数组大小多1。
所以CopyOnWriteArrayList不需要进行数组扩容,这与ArrayList不一样。ArrayList会先初始化一个固定大小的数组,然后数组大小达到阈值时会扩容。
三.总结
为了解决CopyOnWriteArrayList的数组写写并发问题,使用了锁。
为了解决CopyOnWriteArrayList的数组读写并发问题,使用了写时复制。
所以CopyOnWriteArrayList可以保证多线程对数组写写 + 读写的并发安全。
//A thread-safe variant of java.util.ArrayList in which all mutative operations //(add, set, and so on) are implemented by making a fresh copy of the underlying array. public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { ... //The lock protecting all mutators final transient ReentrantLock lock = new ReentrantLock(); //The array, accessed only via getArray/setArray. private transient volatile Object[] array; //Creates an empty list. public CopyOnWriteArrayList() { setArray(new Object[0]); } //Sets the array. final void setArray(Object[] a) { array = a; } //Gets the array. Non-private so as to also be accessible from CopyOnWriteArraySet class. final Object[] getArray() { return array; } //增:Appends the specified element to the end of this list. public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } //删:Removes the element at the specified position in this list. //Shifts any subsequent elements to the left (subtracts one from their indices). //Returns the element that was removed from the list. public E remove(int index) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) { setArray(Arrays.copyOf(elements, len - 1)); } else { //先创建新数组,新数组的大小为len-1,比原数组的大小少1 Object[] newElements = new Object[len - 1]; //把原数组里从0开始拷贝index个元素到新数组里,并且从新数组的0位置开始放置 System.arraycopy(elements, 0, newElements, 0, index); //把原数组从index+1开始拷贝numMoved个元素到新数组里,并且从新数组的index位置开始放置; System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } } //改:Replaces the element at the specified position in this list with the specified element. public E set(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); E oldValue = get(elements, index); if (oldValue != element) { int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len); newElements[index] = element; setArray(newElements); } else { //Not quite a no-op; ensures volatile write semantics setArray(elements); } return oldValue; } finally { lock.unlock(); } } ... }
(3)使用写时复制的原因是读操作不加锁 + 不使用Unsafe读取数组元素
CopyOnWriteArrayList的增删改采用写时复制的原因在于get操作不需加锁。get操作就是先获取array数组,然后再通过index定位返回对应位置的元素。
由于在写数据的时候,首先更新的是复制了原数组数据的新数组。所以同一时间大量的线程读取数组数据时,都会读到原数组的数据,因此读写之间不会出现并发冲突的问题。
而且在写数据的时候,在更新完新数组之后,才会更新volatile修饰的数组变量。所以读操作只需要直接对volatile修饰的数组变量进行读取,就能获取最新的数组值。
如果不使用写时复制机制,那么即便有写线程先更新了array引用的数组中的元素,后续的读线程也只是具有对使用volatile修饰的array引用的可见性,而不会具有对array引用的数组中的元素的可见性。所以此时只要array引用没有发生改变,读线程还是会读到旧的元素,除非使用Unsafe.getObjectVolatile()方法来获取array引用的数组的元素。
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { ... //The array, accessed only via getArray/setArray. private transient volatile Object[] array; //Gets the array. Non-private so as to also be accessible from CopyOnWriteArraySet class. final Object[] getArray() { return array; } public E get(int index) { //先通过getArray()方法获取array数组,然后再通过get()方法定位到数组某位置的元素 return get(getArray(), index); } private E get(Object[] a, int index) { return (E) a[index]; } ... }
(4)对数组进行迭代时采用了副本快照机制
CopyOnWriteArrayList的Iterator迭代器里有一个快照数组snapshot,该数组指向的就是创建迭代器时CopyOnWriteArrayList的当前数组array。
所以使用CopyOnWriteArrayList的迭代器进行迭代时,会遍历快照数组。此时如果有其他线程更新了数组array,也不会影响迭代的过程。
public class CopyOnWriteArrayListDemo { static List<String> list = new CopyOnWriteArrayList<String>(); public static void main(String[] args) { list.add("k"); System.out.println(list); Iterator<String> iterator = list.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next()); } } } public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { ... public Iterator<E> iterator() { return new COWIterator<E>(getArray(), 0); } ... static final class COWIterator<E> implements ListIterator<E> { private final Object[] snapshot; private int cursor; private COWIterator(Object[] elements, int initialCursor) { cursor = initialCursor; snapshot = elements; } ... } ... }
(5)核心思想是通过最终一致性提升读并发
CopyOnWriteArrayList的核心思想是通过弱一致性来提升读写并发的能力。
CopyOnWriteArrayList基于写时复制机制存在的最大问题是最终一致性。
多个线程并发读写数组,写线程已将新数组修改好,但还没设置给array。此时其他读线程读到的(get或者迭代)都是数组array的数据,于是在同一时刻,读线程和写线程看到的数据是不一致的。这就是写时复制机制存在的问题:最终一致性或弱一致性。
(6)写时复制的总结
一.优点
读读不互斥,读写不互斥,写写互斥。同一时间只有一个线程可以写,写的同时允许其他线程来读。
二.缺点
空间换时间,写的时候内存里会出现一模一样的副本,对内存消耗大。通过数组副本可以保证大量的读不需要和写互斥。如果数组很大,可能要考虑内存占用会是数组大小的几倍。此外使用数组副本来统计数据,会存在统计数据不一致的问题。
三.使用场景
适用于读多写少的场景,这样大量的读操作不会被写操作影响,而且不要求统计数据具有实时性。
2.并发安全的链表队列ConcurrentLinkedQueue
(1)ConcurrentLinkedQueue的介绍
(2)ConcurrentLinkedQueue的构造方法
(3)ConcurrentLinkedQueue的offer()方法
(4)ConcurrentLinkedQueue的poll()方法
(5)ConcurrentLinkedQueue的peak()方法
(6)ConcurrentLinkedQueue的size()方法
(1)ConcurrentLinkedQueue的介绍
ConcurrentLinkedQueue是一种并发安全且非阻塞的链表队列(无界队列)。
ConcurrentLinkedQueue采用CAS机制来保证多线程操作队列时的并发安全。
链表队列会采用先进先出的规则来对结点进行排序。每次往链表队列添加元素时,都会添加到队列的尾部。每次需要获取元素时,都会直接返回队列头部的元素。
并发安全的HashMap是ConcurrentHashMap
并发安全的ArrayList是CopyOnWriteArrayList
并发安全的LinkedList是ConcurrentLinkedQueue
(2)ConcurrentLinkedQueue的构造方法
ConcurrentLinkedQueue是基于链表实现的,链表结点为其内部类Node。
ConcurrentLinkedQueue的构造方法会初始化链表的头结点和尾结点为同一个值为null的Node对象。
Node结点通过next指针指向下一个Node结点,从而组成一个单向链表。而ConcurrentLinkedQueue的head和tail两个指针指向了链表的头和尾结点。
public class ConcurrentLinkedQueueDemo { public static void main(String[] args) { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("张三");//向队尾添加元素 queue.offer("李四");//向队尾添加元素 queue.offer("王五");//向队尾添加元素 System.out.println(queue.peek());//返回队头的元素不出队 System.out.println(queue.poll());//返回队头的元素而且出队 System.out.println(queue.peek());//返回队头的元素不出队 } } //An unbounded thread-safe queue based on linked nodes. //This queue orders elements FIFO (first-in-first-out). //The head of the queue is that element that has been on the queue the longest time. //The tail of the queue is that element that has been on the queue the shortest time. //New elements are inserted at the tail of the queue, //and the queue retrieval operations obtain elements at the head of the queue. //A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. //Like most other concurrent collection implementations, this class does not permit the use of null elements. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { ... private transient volatile Node<E> head; private transient volatile Node<E> tail; //构造方法,初始化链表队列的头结点和尾结点为同一个值为null的Node对象 //Creates a ConcurrentLinkedQueue that is initially empty. public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } private static class Node<E> { volatile E item; volatile Node<E> next; private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } } ... }
(3)ConcurrentLinkedQueue的offer()方法
其中关键的代码就是"p.casNext(null, newNode))",就是把p的next指针由原来的指向空设置为指向新的结点,并且通过CAS确保同一时间只有一个线程可以成功执行这个操作。
注意:更新tail指针并不是实时更新的,而是隔一个结点再更新。这样可以减少CAS指令的执行次数,从而降低CAS操作带来的性能影响。
