BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于put与take操作的原理是类似的。下面以ArrayBlockingQueue为例,来说明BlockingQueue的实现原理。
首先看一下ArrayBlockingQueue的构造函数,它初始化了put和take函数中用到的关键成员变量,这两个变量的类型分别是ReentrantLock和Condition。ReentrantLock是AbstractQueuedSynchronizer(AQS)的子类,它的newCondition函数返回的Condition实例,是定义在AQS类内部的ConditionObject类,该类可以直接调用AQS相关的函数。
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。我们会发现put函数使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的机制,即先获得锁,再等待,而不是synchronized和wait的机制。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
再来看一下消费者调用的take函数,take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
扩展阅读
await操作:
我们发现ArrayBlockingQueue并没有使用Object.wait,而是使用的Condition.await,这是为什么呢?Condition对象可以提供和Object的wait和notify一样的行为,但是后者必须先获取synchronized这个内置的monitor锁才能调用,而Condition则必须先获取ReentrantLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是Condition的等待可以中断,这是二者唯一的区别。
我们先来看一下Condition的await函数,await函数的流程大致如下图所示。await函数主要有三个步骤,一是调用addConditionWaiter函数,在condition wait queue队列中添加一个节点,代表当前线程在等待一个消息。然后调用fullyRelease函数,将持有的锁释放掉,调用的是AQS的函数。最后一直调用isOnSyncQueue函数判断节点是否被转移到sync queue队列上,也就是AQS中等待获取锁的队列。如果没有,则进入阻塞状态,如果已经在队列上,则调用acquireQueued函数重新获取锁。
signal操作:
signal函数将condition wait queue队列中队首的线程节点转移等待获取锁的sync queue队列中。这样的话,await函数中调用isOnSyncQueue函数就会返回true,导致await函数进入最后一步重新获取锁的状态。
我们这里来详细解析一下condition wait queue和sync queue两个队列的设计原理。condition wait queue是等待消息的队列,因为阻塞队列为空而进入阻塞状态的take函数操作就是在等待阻塞队列不为空的消息。而sync queue队列则是等待获取锁的队列,take函数获得了消息,就可以运行了,但是它还必须等待获取锁之后才能真正进行运行状态。
signal函数其实就做了一件事情,就是不断尝试调用transferForSignal函数,将condition wait queue队首的一个节点转移到sync queue队列中,直到转移成功。因为一次转移成功,就代表这个消息被成功通知到了等待消息的节点。
signal函数的示意图如下所示。
注意:本文归作者所有,未经作者允许,不得转载