目录
- 1. 是什么
- 2. 使用场景
- 3. 如何使用
- 4. 各种BlockingQueue详解以及对比
- 5. ArrayBlockingQueue
- 6. LinkedBlockingQueue
- 7. PriorityBlockingQueue
- 8. SynchronousQueue
- 9. 参考
1. 是什么
线程安全的阻塞队列。
特点:
- 先进先出:
既然是队列那肯定是先进先出 - 阻塞
支持在插入元素时,如果队列已满,那么阻塞,等待队列非满
也支持在删除元素时,如果队列为空,那么阻塞,等待队列非空 - 无界有界
数组容量的大小。无界其实是Integer.MAX_VALUE - 线程安全
2. 使用场景
生产者、消费者
3. 如何使用
| 方法处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 检查方法 | element() | peek() | 不可用 | 不可用 |
4. 各种BlockingQueue详解以及对比
| ArrayBlockingQueue | LinkedBlockingQueue | PriorityBlockingQueue | SynchronousQueue | |
|---|---|---|---|---|
| 数据结构 | 数组 | 单向链表 | 数组(二叉堆) | 单向链表 |
| 怎么实现阻塞 | Lock+Condition | Lock+Condition | Lock+Condition | CAS+LockSupport |
| 有界/无界 | 有界 | 有界 | 无界 | 无界(不存储元素) |
| 吞吐量(以LinkedBlockingQueue为基准) | 比LinkedBlockingQueue低(读读、读写、写写相互阻塞) | / (读读、写写相互阻塞,读写不相互阻塞) | 无界(读读、读写、写写相互阻塞) | 比LinkedBlockingQueue高(读写匹配才能进行下去) |
5. ArrayBlockingQueue
5.1. 是什么
使用Object数组实现的有界的阻塞队列
读读、读写、写写相互阻塞
5.2. 如何使用
public class ArrayBlockingQueueTest { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1); CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ for (int i = 0;;i++) { try { String data = "data" + i; queue.put(data); System.out.println("Producer放入消息:" + data); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); new Thread(()->{ for (;;) { try { System.out.println("Consumer获取消息:" + queue.take()); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); latch.await(); } }
5.2.1. 方法选择
| 方法处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 检查方法 | element() | peek() | 不可用 | 不可用 |
5.3. 原理分析
5.3.1. uml

5.3.2. 构造方法
5.3.2.1. 底层使用数组+Lock+Condtion实现
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //底层是数组实现的 final Object[] items; //take, poll, peek or remove等读方法,读取下一个元素的位置 int takeIndex; //put, offer, or add等方法,写入下一个元素的位置 int putIndex; //数组中实际元素的数量 //当count==item.length()的时候说明数组已满 int count; //一个锁说明读写互斥 final ReentrantLock lock; //两个条件量 private final Condition notEmpty;//用来唤醒读线程 private final Condition notFull;//用来唤醒写线程 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } }
5.3.3. put【阻塞】
public void put(E e) throws InterruptedException { checkNotNull(e); //加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果数组已经满了,那么等待。读者取出元素后唤醒 while (count == items.length) notFull.await(); //没满,加入数组 enqueue(e); } finally { lock.unlock(); } }
- 4行:加锁。一旦该写线程加锁其他读写线程都不能同时进来
- 8-9行:如果数组已经满了,那么阻塞等待
- 11行:没满则入队并唤醒读者
下面具体分析:
5.3.3.1. 加锁
//加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //... } finally { lock.unlock(); }
5.3.3.2. 如果数组已经满了,那么等待
//如果数组已经满了,那么等待。直到读者取出元素后唤醒 while (count == items.length) notFull.await();
5.3.3.3. 没满则入队并唤醒读者
enqueue(e);
- enqueue
private void enqueue(E x) { //把元素加入到队尾 final Object[] items = this.items; items[putIndex] = x; //已插入到末尾,重置插入索引为0 //这个数组是可以循环使用的,不需要扩容。 if (++putIndex == items.length) putIndex = 0; count++; //插入后唤醒读者 notEmpty.signal(); }
5.3.4. take【阻塞】
public E take() throws InterruptedException { //加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //如果数组为空,那么等待。写者加入元素后唤醒 while (count == 0) notEmpty.await(); //出队 return dequeue(); } finally { //释放锁 lock.unlock(); } }
- 3行:加锁。一旦该读线程加锁其他读写线程都不能同时进来
- 6-8行:如果数组为空,那么等待
- 10行:不为空则出队并唤醒写者
下面具体分析:
5.3.4.1. 加锁
//加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //... } finally { //释放锁 lock.unlock(); }
5.3.4.2. 如果数组为空,那么等待
//如果数组为空,那么等待。写者加入元素后唤醒 while (count == 0) notEmpty.await();
5.3.4.3. 不为空则出队并唤醒写者
- dequeue
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") //获取最后一个元素并置为null E x = (E) items[takeIndex]; items[takeIndex] = null; //已取到末尾,重置取值索引为0 //这个数组是可以循环使用的,不需要扩容。 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //出队后唤醒写者 notFull.signal(); return x; }
5.3.5. offer【返回特殊值】
public boolean offer(E e) { checkNotNull(e); //加锁 final ReentrantLock lock = this.lock; lock.lock(); try { //已满,直接返回false if (count == items.length) return false; else { //未满,加入队列同时唤醒读者 enqueue(e); return true; } } finally { //解锁 lock.unlock(); } }
5.3.6. poll【返回特殊值】
public E poll() { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { //长度为0直接返回null,否则出队并唤醒写者 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
5.3.7. add【抛出异常】
public boolean add(E e) { //简单调用AbstractQueue的add方法 return super.add(e); } //AbstractQueue的add方法 public boolean add(E e) { //调用ArrayBlockingQueue的方offer法 if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
5.3.8. remove【抛出异常】
public E remove() { //简单调用poll方法 E x = poll(); if (x != null) return x; else //没有元素,抛出异常 throw new NoSuchElementException(); }
5.3.9. element【抛出异常】
public E element() { //调用peek方法 E x = peek(); if (x != null) return x; else //为空直接抛出异常 throw new NoSuchElementException(); }
5.3.10. peek【返回特殊值】
public E peek() { //加锁 final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { //解锁 lock.unlock(); } } @SuppressWarnings("unchecked") final E itemAt(int i) { //直接返回数组中的第i个元素 return (E) items[i]; }
5.4. 总结
底层使用数组实现,是个有界队列。
并且用了一个锁和两个condition。一个锁说明读写互斥,两个conditon说明读写相互唤醒
6. LinkedBlockingQueue
6.1. 是什么
使用单向链表实现的有界的阻塞队列
读读、写写相互阻塞,读写不相互阻塞
吞吐量比ArrayBlockingQueue高
6.2. 如何使用
public class LinkedBlockingQueueTest { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1); CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ for (int i = 0;;i++) { try { String data = "data" + i; queue.put(data); System.out.println("Producer放入消息:" + data); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); new Thread(()->{ for (;;) { try { System.out.println("Consumer获取消息:" + queue.take()); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); latch.await(); } }
6.3. 源码分析
6.3.1. 构造方法
6.3.1.1. 底层使用单向链表+Lock+Condition实现
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //最大长度 private final int capacity; //实际长度 private final AtomicInteger count = new AtomicInteger(); //头节点 transient Node<E> head; //尾节点 private transient Node<E> last; //出队时用的锁。锁住队头 private final ReentrantLock takeLock = new ReentrantLock(); //如果读操作的时候队列是空的,那么等待 notEmpty 条件 private final Condition notEmpty = takeLock.newCondition(); //入队时用的锁。锁住队尾 private final ReentrantLock putLock = new ReentrantLock(); // 如果写操作的时候队列是满的,那么等待 notFull 条件 private final Condition notFull = putLock.newCondition(); public LinkedBlockingQueue() { //相当于无界队列 this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity;//有界队列 last = head = new Node<E>(null);//头节点是个占位符 } }
6.3.1.2. Node
static class Node<E> { E item; //单向队列 Node<E> next; Node(E x) { item = x; } }
结构如下图:

6.3.2. put【阻塞】
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //加写锁 putLock.lockInterruptibly(); try { //链表实际容量到达链表最大容量,阻塞等待读者取出 while (count.get() == capacity) { notFull.await(); } //加入尾部 enqueue(node); c = count.getAndIncrement();//+1,不过返回的是c的原值 if (c + 1 < capacity) notFull.signal();//唤醒其他写者? } finally { putLock.unlock(); } //c == 0 说明原来queue是空的, 那么可能有其他读线程阻塞住了。 if (c == 0) //所以这里 唤醒正在 poll/take 等待中的线程 signalNotEmpty(); }
- 8行:加写锁。一旦加了写锁其他写者无法同时进来写入数据,但是读者可以同时进来读
- 11-13行:链表实际容量到达链表最大容量,那么写者阻塞等待读者取出
- 15行:链表没有满的话,那么把该元素添加至尾部
- 16行:更新队列中元素的数量,+1,返回原值
- 17-18行:添加了元素后发现队列还是没有满,那么唤醒其他写者继续添加
- 23-25行:由这句
c = count.getAndIncrement();可看出+1后返回的是c的原值,如果为0说明之前队列可能为空,那么加读锁、唤醒读者读取元素、解读锁
下面具体分析:
6.3.2.1. 加写锁
//加写锁 putLock.lockInterruptibly(); try { //... } finally { putLock.unlock(); }
6.3.2.2. 如果队列已满那么等待
//链表实际容量到达链表最大容量,阻塞等待读者取出 while (count.get() == capacity) { notFull.await(); }
6.3.2.3. 未满则入队
- enqueue
private void enqueue(Node<E> node) { //把节点加入到链表尾部,并且更新last指针 last = last.next = node; }
6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
if (c + 1 < capacity) notFull.signal();//唤醒其他写者
6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者
//c == 0 说明原来queue是空的, 那么可能有其他读线程阻塞住了。 if (c == 0) //所以这里 唤醒正在 poll/take 等待中的线程 signalNotEmpty();
- signalNotEmpty
private void signalNotEmpty() { //加读锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //唤醒读者 notEmpty.signal(); } finally { takeLock.unlock(); } }
6.3.3. take【阻塞】
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //加了读锁 takeLock.lockInterruptibly(); try { //长度为0,阻塞等待写着加入 while (count.get() == 0) { notEmpty.await(); } //删除第一个节点 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal();//唤醒其他读者? } finally { takeLock.unlock(); } //c == capacity 说明原来queue是满的, 那么可能有其他写线程阻塞住了。 if (c == capacity) //所以这里 唤醒正在 put 等待中的线程 signalNotFull(); return x; }
- 7行:加读锁。一旦加了读锁其他读者无法同时进来读取数据,但是写者可以同时进来写数据
- 10-12行:链表实际容量为0,那么读者阻塞等待写者写入
- 14行:链表不为空的话,那么删除链表头部的元素
- 15行:更新队列中元素的数量,-1,返回原值
- 16-17行:取出了元素后发现队列还是不为空,那么唤醒其他读者继续读取
- 22-24行:由这句
c = count.getAndDecrement();可看出-1后返回的是c的原值,当他为capacity的时候说明之前队列可能是满的,那么加写锁、唤醒写者写入元素、解写锁
下面具体分析:
6.3.3.1. 加读锁
//加了读锁 takeLock.lockInterruptibly(); try { //.... } finally { takeLock.unlock(); }
6.3.3.2. 队列为空那么等待
//长度为0,阻塞等待写着加入 while (count.get() == 0) { notEmpty.await(); }
6.3.3.3. 未空则出队
- dequeue
private E dequeue() { Node<E> h = head;//头节点是个占位符 Node<E> first = h.next;//真正的第一个节点 h.next = h; // help GC 头节点next指向头节点自己? head = first;//更新头节点指向第一个节点(即从队头出队) E x = first.item; first.item = null; return x; }
6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
if (c > 1) notEmpty.signal();//唤醒其他读者?
6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
if (c == capacity) //加写锁,唤醒写者 signalNotFull();
- signalNotFull
private void signalNotFull() { final ReentrantLock putLock = this.putLock; //加写锁 putLock.lock(); try { //通知写者没满,可以写了 notFull.signal(); } finally { putLock.unlock(); } }
6.3.4. offer 返回特殊值
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0;//跟put不同的地方在这里,返回而不阻塞 }
6.3.5. poll 返回特殊值
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0)//跟take不同的地方在这里,返回null return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
6.3.6. peek 返回特殊值
public E peek() { if (count.get() == 0)//为空返回null return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } //不需要唤醒写着,因为没有出队 }
6.4. 总结
底层使用单向数组实现,可以有界也可以无界队列。
并且用了两个锁和两个condition。两个个锁说明读写可以同时进行,两个conditon说明读写相互唤醒
7. PriorityBlockingQueue
7.1. 是什么
底层使用数组(二叉堆)实现的无界的阻塞队列
读读、读写、写写相互阻塞
可以排序
由于无界,所以put操作不会阻塞,但是take操作会阻塞(队列为空的时候)
7.1.1. 二叉堆
一颗完全二叉树,堆序性质为,每个节点的值都小于其左右子节点的值,二叉堆中最小的值就是根节点。
底层用数组进行存储。对于数组中的元素 a[i],其左子节点为 a[2i+1],其右子节点为 a[2i + 2],其父节点为 a[(i-1)/2]。
结构如下图:

7.2. 如何使用
public class PriorityBlockingQueueTest { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(1); CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ for (int i = 0;;i++) { try { String data = "data" + i; queue.put(data); System.out.println("Producer放入消息:" + data); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); new Thread(()->{ for (;;) { try { System.out.println("Consumer获取消息:" + queue.take()); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); latch.await(); } }
7.3. 原理分析
7.3.1. 构造方法
7.3.1.1. 底层使用数组+Lock+Condition实现
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //底层使用数组实现(堆) private transient Object[] queue; //实际使用的长度 private transient int size; //comparator确定元素的顺序,如果是null那么是自然序 private transient Comparator<? super E> comparator; //只有一把锁说明读写互斥 private final ReentrantLock lock; //只有一个condition说明只有读或者写的操作是阻塞的 //当队列不为空的时候唤醒读操作 private final Condition notEmpty; // 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作 // 其使用 CAS 操作 private transient volatile int allocationSpinLock; public PriorityBlockingQueue() { //默认11个,自然序 this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } }
7.3.2. put
public void put(E e) { //转调offer offer(e); // never need to block }
7.3.2.1. 转调offer,不需要阻塞
- offer
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; //加锁 lock.lock(); int n, cap; Object[] array; //如果当前队列中的元素个数 >= 数组的大小,那么需要扩容了 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //自然序。把e加入到数组array末尾的位置n,然后与父亲比较,若是比父亲小则交换位置 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; //唤醒读者 notEmpty.signal(); } finally { lock.unlock(); } return true; }
- 6行:加锁。一旦该写者加了锁,那么其他读写线程不能进来操作
- 10-11行:根据需要进行扩容
- 13-18行:插入数组末尾,并且通过上浮操作保持堆的性质
- 19行:队列中元素的实际数量+1
- 21行:其他读者可能在队列为空的时候阻塞,这里需要唤醒
由上面的代码可以看出写的时候是不需要阻塞的,因为这个队列是无界的
7.3.2.1.1. 加锁
final ReentrantLock lock = this.lock; //加锁 lock.lock(); } finally { lock.unlock(); }
7.3.2.1.2. 判断是否需要扩容
//如果当前队列中的元素个数 >= 数组的大小,那么需要扩容了 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap);
7.3.2.1.2.1. 需要的话进行扩容
- tryGrow
private void tryGrow(Object[] array, int oldCap) { //为什么这里释放锁?让读的线程可以读而不至于再扩容的时候阻塞 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; //allocationSpinLock为0表示没有其他进行扩容,1表示有 //当没有其他线程扩容 且 当前线程CAS加锁成功才进行扩容 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //如果旧容量<64,那么新容量=2*旧容量+2 //否则为1.5*旧容量 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); //溢出判断 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } //确实有扩容 且 array没有变动--说明没有其他线程在扩容? if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { //释放锁 allocationSpinLock = 0; } } //其他线程在扩容,让出CPU if (newArray == null) // back off if another thread is allocating Thread.yield(); //这里有重新加锁了?扩容完毕,需要真正的修改数组了,这里需要阻塞读 lock.lock(); //转移旧数组到新数组 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
7.3.2.1.3. 把元素加入堆的末尾
//自然序。把e加入到数组array末尾的位置n,然后与父亲比较,若是比父亲小则交换位置 if (cmp == null) siftUpComparable(n, e, array);
7.3.2.1.3.1. 上浮操作调整堆
- siftUpComparable
//把x,插入到堆数组array,的k位置 private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; //最多调整到root即0 while (k > 0) { //父节点的位置 (k-1)/2 int parent = (k - 1) >>> 1; Object e = array[parent]; //如果x比父节点大,那么退出 if (key.compareTo((T) e) >= 0) break; //否则与父节点交换位置 array[k] = e; //从父节点继续往上 k = parent; } //走到这里说明k位置存放x满足二叉堆的性质:比父节点大,比左右孩子小 array[k] = key; }
7.3.2.1.3.2. 调整的过程图

7.3.3. take
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); E result; try { //出队元素为空那么阻塞等待唤醒 while ( (result = dequeue()) == null) notEmpty.await(); } finally { //解锁 lock.unlock(); } return result; }
- 4行:加锁。一旦该读者加了锁,那么其他读写线程不能进来操作
- 8-9行:出队,如果队列为空那么进行阻塞,等待队列不为空的时候由写者唤醒
7.3.3.1. 加锁
final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { //... } finally { //解锁 lock.unlock(); }
7.3.3.2. 一直阻塞等待,直到出队成功
//出队元素为空那么阻塞等待唤醒 while ( (result = dequeue()) == null) notEmpty.await();
7.3.3.2.1. 出队具体操作
- dequeue
private E dequeue() { //队列为空返回null int n = size - 1; if (n < 0) return null; else { Object[] array = queue; //root节点,即0号位置就是出队的元素 E result = (E) array[0]; E x = (E) array[n];//数组末尾的元素x array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) //把数组末尾的元素x放到0号位置,调整堆 siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
- 7-12行:移除堆顶,末尾元素放到堆顶
- 14-18行:下沉操作调整堆
7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
Object[] array = queue; //root节点,即0号位置就是出队的元素 E result = (E) array[0]; E x = (E) array[n];//数组末尾的元素x array[n] = null;
7.3.3.2.1.2. 下沉操作调整堆
- siftDownComparable
//把元素x,插入到长度为n,的堆数组array的,k位置 private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //只能在 非叶子节点(有孩子的节点)调整 int half = n >>> 1; while (k < half) { //左孩子 int child = (k << 1) + 1; // assume left child is least Object c = array[child]; //右孩子 int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //c是左右孩子中较小的那个 //如果要插入的元素比左右孩子都小,那么二叉堆性质以满足,无需调整 if (key.compareTo((T) c) <= 0) break; //否则将较小的孩子上移 array[k] = c; //继续往下调整 k = child; } //走到这里说明k位置存放x满足二叉堆的性质:比父节点大,比左右孩子小 array[k] = key; } }
7.3.3.2.1.3. 调整的过程图

7.4. 总结
无界队列,底层使用二叉堆实现,有序。
写不阻塞,读阻塞
8. SynchronousQueue
8.1. 是什么
底层使用单向实现的阻塞队列,不存储元素
一个写者必须同时有一个读者才能进行下去,反之亦然。
否则写者将会一直阻塞或者读者将会一直阻塞
8.2. 使用
public class SynchronousQueueTest { public static void main(String[] args) throws InterruptedException { SynchronousQueue <String> queue = new SynchronousQueue<>(); CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ for (int i = 0;;i++) { try { String data = "data" + i; queue.put(data); System.out.println("Producer放入消息:" + data);//不支持peek操作 TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); new Thread(()->{ for (;;) { try { System.out.println("Consumer获取消息:" + queue.take()); } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); latch.await(); } }
8.3. 原理
8.3.1. 构造方法
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private transient volatile Transferer<E> transferer; public SynchronousQueue() { this(false);//默认不公平,即用stack } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } //单向链表头、尾 transient volatile QNode head; transient volatile QNode tail; }
8.3.1.1. Transfer
abstract static class Transferer<E> { //put和take操作都会调用这个 //如果e为空,那么代表读者的take操作 //如果e不为空,那么代表写着的put操作 // 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值 // 返回值如果是 null,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。 abstract E transfer(E e, boolean timed, long nanos); }
8.3.1.2. QNode
static final class QNode { volatile QNode next; // 单向链表 volatile Object item; // CAS'ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData;//true表示写,false表示读 }
8.3.2. put 阻塞
public void put(E e) throws InterruptedException { //写着e保证不为空 if (e == null) throw new NullPointerException(); //调用Transfer的transfer方法传递元素给读者 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
8.3.2.1. 调用TransferQueue
- TransferQueue transfer
E transfer(E e, boolean timed, long nanos) { QNode s = null; boolean isData = (e != null);//e不为空表示写(true),为空表示读(false) for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin //队列为空或者队列中尾节点的模式与当前节点一样(即都是写或者都是读的情况) //那么直接将当前节点入队 if (h == t || t.isData == isData) { QNode tn = t.next; //之前的tail跟当前tail不同,说明已经有节点入队了,重新来一次 if (t != tail) continue; //走到这里说明tail没有改变,可以tail.next居然不为空,说明有节点入队,但是还没有修改tail //那么把tail指向tail.next即可 if (tn != null) { advanceTail(t, tn);//tail==t的话,把tail指向tn continue; } //设置了超时但是时间不对 if (timed && nanos <= 0) return null; //构造当前节点 if (s == null) s = new QNode(e, isData); //插入到链表尾部 if (!t.casNext(null, s)) continue; //tail==t的话,把tail指向s advanceTail(t, s); //自旋或者阻塞等待另一个模式的线程过来唤醒 //写线程拿到的是null,读线程拿到的是写线程的值 Object x = awaitFulfill(s, e, timed, nanos); //走到这里说明已经唤醒了,继续往下执行 if (x == s) { // wait was cancelled clean(t, s); return null; } //当前节点的next不是当前节点 //那么当头节点==尾节点的时候,CAS设置头为当前节点 if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } //一读一写刚好匹配的情况 else { //头节点的next是当前节点 QNode m = h.next; //头节点或者尾节点或者头节点的next为空了,即链表改变了,重新开始 if (t != tail || m == null || h != head) continue; //失败重试的情况 Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } //CAS需改头节点。如果h==head,那么修改头节点为当前节点 advanceHead(h, m); // successfully fulfilled //唤醒当前节点的线程。对应awaitFulfill LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
- advanceTail
void advanceTail(QNode t, QNode nt) { //如果当前尾节点==传过来的尾节点的话 if (tail == t) //CAS操作修改尾节点指针指向nt UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); }
- awaitFulfill
//要么自旋、要么阻塞 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { //设置了超时,那么计算超时到期的时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //头节点的下一个节点就是我自己了,那么我不入队,而是自旋等待 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { //当前线程被中断了,那么将当前节点的item属性CAS设置为e if (w.isInterrupted()) s.tryCancel(e); //这里是这个方法的唯一的出口 //当前节点的item属性跟e不同的时候 Object x = s.item; if (x != e) return x; //超时了,那么将当前节点的item属性CAS设置为e if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } //每次循环自旋-1 if (spins > 0) --spins; //走到这里说明自旋到了最大次数或者没有设置自旋 //当前节点还没关联线程,那么关联 else if (s.waiter == null) s.waiter = w; //没有设置超时,那么阻塞 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
- tryCancel
void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); }
- advanceHead
void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next }
8.3.3. take 阻塞
public E take() throws InterruptedException { //调用Transfer的transfer方法从写者获取元素 E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
8.4. 总结
不存储元素,吞吐量比LinkedBlockingQueue高
读、写必须匹配才能进行下去,否则会加入队列阻塞等待,直到另一个模式的线程到来唤醒