juc之LinkedBlockingQueue、SynchronousQueue

Author Avatar
Sean Yu 11月 24, 2020
  • 在其它设备中阅读本文章

内容转载自 https://www.jianshu.com/p/28c9d9e34b29 稍有改动

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,满足 FIFO 的特性,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE。因为Integer.MAX_VALUE数值很大,所以可以称为”无界”队列(这里的无界并不是真正意义上的无界)。

LinkedBlockingQueue比ArrayBlockingQueue具有更高的吞吐量,但在大多数并发应用程序中的预测性能较差。

队列中存在最久的元素存在于 head.next 节点上(PS: head 节点永远存在, 且是一个 dummy 节点), 存储时间最短的节点存储在tail上; 通常情况下 LinkedBlockingQueue 的吞吐量要好于 ArrayBlockingQueue.
主要特点:

基于两个lock的 queue, putLock, takeLock; 并且两个锁都有相关联的 condition 用于相应的 await; 每次进行 put/offer 或 take/poll 之后会根据queue的容量进行判断是否需要进行对应的唤醒
队列中总是存在一个 dummy 节点, 每次 poll 节点时获取的是 head.next 节点中的值

为了实现阻塞效果并保证线程安全,它的内部用到了两个锁和两个 Condition。(ArrayBlockingQueue只有一个锁,这导致了LinkedBlockingQueue可以一边取一边放,而ArrayBlockingQueue不行。为啥?因为固定长度的count是个int,不能保证原子性和可见性,至于为什么这么设计,我也不清楚)

queue中的数据存储在 Node 对象中, 且 Node 具有以下的特点:

  • head.item 永远是 null, head是一个 dummy 节点, 所以进行 poll 时获取的是 head.next 的值
  • tail.next = null
/** Linked list node class */
/**
 * Linked 的数据节点, 这里有个特点, LinkedBlockingQueue 开始构建时会创建一个dummy节点(类似于 ConcurrentLinkedQueue)
 * 而整个队列里面的头节点都是 dummy 节点
 * @param <E>
 */
static class Node<E>{
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    /**
     * 在进行 poll 时 会将 node.next = node 来进行 help gc
     * next == null 指的是要么是队列的 tail 节点
     */
    Node<E> next;
    Node(E x){
        item = x;
    }
}

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list
 * Invariant: head.item == null
 * Head 节点的不变性 head.item == null <- 这是一个 dummy 节点(dummy 节点什么作用呢, 主要是方便代码, 不然的话需要处理一些 if 的判断, 加大代码的复杂度, 尤其是非阻塞的实现)
 */
transient Node<E> head;

/**
 * Tail of linked list
 * Invariant: last.next == null
 * Tail 节点的不变性 last.next == null <- 尾节点的 next 是 null
 */
private transient Node<E> last;

/** ReentrantLock Condition 的常见使用方式 */
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue 构造函数
LinkedBlockingQueue的构造函数比较简单, 主要是初始化一下容量(默认 Integer.MAX_VALUE), 及 head, tail

/**
 * Creates a {@code KLinkedBlockingQueue} with the given (fixed) capacity
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *                                  than zero
 */
public KLinkedBlockingQueue(int capacity){
    if(capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity; // 指定 queue 的容量
    last = head = new Node<E>(null); // 默认的在 queue 里面 创建 一个 dummy 节点
}

添加元素 put方法
put 方法是将元素添加到队列尾部, queue满时进行await, 添加成功后容量还未满, 则进行 signal

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary for space to become available
 *
 *  将元素加入到 queue 的尾部
 * @param e
 * @throws InterruptedException
 */
public void put(E e) throws InterruptedException{
    if(e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negativeto indicate failure unless set.
    // 有趣的 变量 c 下面会有对它的讲解
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLocK = this.putLock;
    final AtomicInteger count = this.count;  // 获取 queue 的数量 count 
    putLocK.lockInterruptibly(); // 获取 put 的lock
    try {
        /**
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards
         */
        /**
         * 若 queue 的容量满了 则进行 await,直到有人进行通知
         * 那何时进行通知呢?
         * 有两种情况进行通知,
         *      (1) 有线程进行 put/offer 成功后且 (c + 1) < capacity 时
         *      (2) 在线程进行 take/poll 成功 且 (c == capacity) (PS: 这里的 c 指的是 在进行 take/poll 之前的容量)
         */

        while(count.get() == capacity){     // 容量满了, 进行等待
            notFull.await();
        }
        enqueue(node);                        // 进行节点的入队操作
        c = count.getAndIncrement();          // 进行节点个数的增加1, 返回原来的值
        if(c + 1 < capacity){               // 说明 现在的 put 操作后 queue 还没满
            notFull.signal();               // 唤醒其他在睡的线程
        }

    }finally {
        putLock.unlock();                   // 释放锁
    }
    if(c == 0){                             // c == 0 说明 原来queue是空的, 所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
        signalNotEmpty();
    }
}

/**
 * Links node at end of queue
 * 节点 入队列 (PS: 这里因为有个 dummy 节点, 不需要判空 <- 现在有点感觉 dummy 节点的作用了吧)
 * @param node the node
 */
private void enqueue(Node<E> node){
    // assert putLock.isHeldByCurrentThread()
    // assert last.next == null
    last = last.next = node;
}

/**
 * Signals a waiting take. Called only from put/offer (which do not
 * otherwise ordinarily lock takeLock.)
 */
private void signalNotEmpty(){
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    }finally {
        takeLock.unlock();
    }
}

代码的注释中基本把操作思想都说了, 有几个注意的地方

  • 当queue满时, 会调用 notFull.await() 进行等待, 而相应的唤醒的地方有两处, 一个是 “有线程进行 put/offer 成功后且 (c + 1) < capacity 时”, 另一处是 “在线程进行 take/poll 成功 且 (c == capacity) (PS: 这里的 c 指的是 在进行 take/poll 之前的容量)”
  • 代码中的 “signalNotEmpty” 这时在原来queue的数量 c (getAndIncrement的返回值是原来的值) ==0 时对此时在调用 take/poll 方法的线程进行唤醒

添加元素offer 方法

offer与put都是添加元素到queue的尾部, 只不过 put 方法在队列满时会进行阻塞, 直到成功; 而 offer 操作在容量满时直接返回 false.

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary up to the specified wait time for space to become available
 *
 *  支持中断和超时的 offer 节点
 *
 * @param e
 * @param timeout
 * @param unit
 * @return {@code true} if successful, or {@code false} if
 *          the specified waiting time elapses before space is available
 * @throws InterruptedException
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException{
    if(e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;     // 获取 put lock
    final AtomicInteger count = this.count;         // 获取 queue 的容量
    putLock.lockInterruptibly();
    try {
        while(count.get() == capacity){             // queue的容量满了进行 带 timeout 的 await
            if(nanos <= 0){                           //  用光了 timeout 直接 return false
                return false;
            }
            nanos = notFull.awaitNanos(nanos);      // 直接 await (PS: 返回值 nanos <= 0 说明 等待是超时了, 正常 await 并且 被 signal nanos > 0; 具体详情会在 Condition 那一篇中详细说明)
        }
        enqueue(new Node<E>(e));                    // 节点若队列
        c = count.getAndIncrement();                // 获取入队列之前的容量
        if(c + 1 < capacity){                     // c + 1 < capacity 说明 现在的 offer 成功后 queue 还没满
            notFull.signal();                     // 唤醒其他正在 await 的线程
        }
    }finally {
        putLock.unlock();                           // 释放锁
    }
    if(c == 0){
        signalNotEmpty();                            // c == 0 说明 原来queue是空的, 所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
    }
    return true;
}

offer 整个操作和 put 差不多, 唯一变化的是多了一个 notFull.awaitNanos(nanos), 这个函数的返回值若是负数, 则说明等待超时, 则直接 return false (关于 Condition.awaitNanos 方法会在后续再说)

获取queue头元素 take 方法
此方法是获取 queue 中呆着时间最长的节点的值(head.next)

/**
 * 取走 queue 中呆着时间最长的节点的 item (其实就是 head.next.item 的值)
 * @return
 * @throws InterruptedException
 */
public E take() throws InterruptedException{
    E x;
    int c = -1;
    final AtomicInteger count = this.count;          // 获取 queue 的容量
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();                      // 获取 lock
    try {
        while(count.get() == 0){                      // queue 为空, 进行 await
            notEmpty.await();
        }
        x = dequeue();                                 // 将 head.next.item 的值取出, head = head.next
        c = count.getAndDecrement();                   // queue 的容量计数减一
        if(c > 1){
            notEmpty.signal();                        // c > 1 说明 进行 take 后 queue 还有值
        }
    }finally {
        takeLock.unlock();                              // 释放 lock
    }
    if(c == capacity){                                // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
        signalNotFull();
    }
    return x;
}

/**
 * Removes a node from head of queue
 * 节点出队列 这里有个注意点 head 永远是 dummy 节点, dequeue 的值是 head.next.item 的值
 * 在 dequeue 后 将 原  head 的后继节点设置为 head(成为了 dummy 节点)
 * @return the node
 */
private E dequeue(){
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;       // 这里的 head 是一个 dummy 节点
    Node<E> first = h.next; // 获取真正的节点
    h.next = h;             // help GC
    head = first;           // 重行赋值 head
    E x = first.item;       // 获取 dequeue 的值
    first.item = null;      // 将 item 置 空
    return x;
}

/** Signal a waiting put. Called only from take/poll */
private void signalNotFull(){
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    }finally {
        putLock.unlock();
    }
}

操作过程: 将 head.next 的值取出, 将 head.next 设置为新的head; 操作的步骤比较少, 只有两处 condition 的唤醒需要注意一下:

当 take 结束时, 判断 queue 是否还有元素 (c > 1) 来进行 notEmpty.signal()
当 take 结束时, 判断原先的容量是否已经满 (c == capacity) 来决定是否需要调用 signalNotFull 进行唤醒此刻还在等待 put/offer 的线程

获取queue头元素 poll 方法
poll 与 take 都是获取头节点的元素, 唯一的区别是 take在queue为空时进行await, poll 则直接返回

/**
 * 带 timeout 的poll 操作, 获取 head.next.item 的值
 * @param timeout
 * @param unit
 * @return
 * @throws InterruptedException
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException{
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);             //  计算超时时间
    final AtomicInteger count = this.count;       // 获取 queue 的容量
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();                   // 获取 lock
    try{
        while(count.get() == 0){                   // queue 为空, 进行 await
            if(nanos <= 0){                        // timeout 用光了, 直接 return null
                return null;
            }
            nanos = notEmpty.awaitNanos(nanos);   // 调用 condition 进行 await, 在 timeout之内进行 signal -> nanos> 0
        }
        x = dequeue();                             // 节点出queue
        c = count.getAndDecrement();               // 计算器减一
        if(c > 1){                                 // c > 1 说明 poll 后 容器内还有元素, 进行 换新 await 的线程
            notEmpty.signal();
        }
    }finally {
        takeLock.unlock();                         // 释放锁
    }
    if(c == capacity){                           // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
        signalNotFull();
    }
    return x;
}

LinkedBlockingQueue 是一个基于链表实现的阻塞queue, 它的性能好于 ArrayBlockingQueue, 但是差于 ConcurrentLinkeQueue; 并且它非常适于生产者消费者的环境中, 比如 Executors.newFixedThreadPool() 就是基于这个队列的。

LinkedBlockingQueue的静态工厂方法
使用LinkedBlockingQueue的好处:

因为线程大小固定的线程池,其线程的数量是不具备伸缩性的,当任务非常繁忙的时候,就势必会导致所有的线程都处于工作状态,导致队列满的情况发生,从而导致任务无法提交而抛出RejectedExecutionException;
而使用”无界”队列由于其良好的存储容量的伸缩性,可以很好的去缓冲任务繁忙情况下场景,即使任务非常多,也可以进行动态扩容,当任务被处理完成之后,队列中的节点也会被随之被GC回收,非常灵活。

FixedThreadPool的execute()说明

DNQXj0.png

DNQxBT.png

FixedThreadPool的execute()执行流程

  • 如果当前运行的线程数少于corePoolSize,则会创建新线程来执行任务;

  • 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue;

  • 线程执行完步骤A中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

使用Executors.newFixedThreadPool相关的方法,LinkedBlockingQueue的capacity为Integer.MAX_VALUE,所以会对线程池有以下影响:

  • 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize;

  • 由于A影响,使用”无界”队列时maximumPoolSize将是一个无效参数;

  • 由于A、B影响,使用”无界”队列时keepAliveTime将是一个无效参数;

  • 由于使用”无界队列”,运行中的FixedThreadPool(未执行shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionException.rejectedExecution方法)。

SingleThreadExecutor的execute()说明

DNlDvq.png

DNl6bT.png

SingleThreadExecutor的execute()执行流程

  • 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则会创建一个新线程来执行任务;

  • 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue;

  • 线程执行完步骤A中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。

SingleThreadExecutor使用”无界”的工作队列对线程池带来的影响与使用Executors.newFixedThreadPool相同。

ArrayBlockingQueue和LinkedBlockingQueue的比较

  • ArrayBlockingQueue由于其底层基于数组,并且在创建时指定存储的大小,在完成后就会立即在内存分配固定大小容量的数组元素,因此其存储通常有限;
  • 而LinkedBlockingQueue可以由用户指定最大存储容量,也可以无需指定,如果不指定则最大存储容量将是Integer.MAX_VALUE,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。
  • 但是由于ArrayBlockingQueue的有界性,因此其能够更好的对于性能进行预测,
  • 而LinkedBlockingQueue由于没有限制大小,当任务非常多的时候,不停地向队列中存储,就有可能导致内存溢出的情况发生。

  • ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

SynchronousQueue

每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
SynchronousQueue的支持公平策略和非公平策略,所以底层可能两种数据结构:队列(实现公平策略)和栈(实现非公平策略),队列与栈都是通过链表来实现的。

SynchronousQueue的静态工厂方法

SynchronousQueue的execute()说明

DNG0oT.png

DNGrYF.png

可以看到CacheThreadPool的corePoolSize被设置为0,即corePool为空,maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是”无界”的,这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度,CacheThreadPool会不断创建新的线程,极端情况下,CacheThreadPool会因为创建过多线程而耗尽CPU和内存资源;keepAliveTime设置为60L,意味着CacheThreadPool中的空闲线程等待新任务的最长时间为60s,空闲线程超过60s将被终止。

SynchronousQueue的execute()执行流程

  • 首先执行SynchronousQueue.offer(E o)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(long timeout, TimeUnit unit),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤2;

  • 当初始maximumPool为空,或者maximumPool中当前没有空闲线程,将没有线程执行SynchronousQueue.poll(long timeout, TimeUnit unit)。这种情况下步骤1将失败。此时CacheThreadPool会创建一个新线程执行任务,execute()方法执行完成;

  • 在步骤2中创建的线程将任务执行完成后,会执行SynchronousQueue.poll(long timeout, TimeUnit unit)。这个poll操作会让线程最多在SynchronousQueue中等待60s。如果60s内主线程就提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于60s的空闲线程会被终止,因此长时间保持空闲的CacheThreadPool不会使用任何资源。