一、媒介
前面先容了利用CAS实现的非阻塞行列ConcurrentLinkedQueue,下面就来先容下利用独有锁实现的阻塞行列LinkedBlockingQueue的实现
二、 LinkedBlockingQueue类图布局
如图LinkedBlockingQueue中也有两个Node别离用来存放首尾节点,而且内里有个初始值为0的原子变量count用来记录行列元素个数,别的内里有两个ReentrantLock的独有锁,劳务派遣管理系统,别离用来节制元素入队和出队加锁,个中takeLock用来节制同时只有一个线程可以从行列获取元素,其他线程必需期待,putLock节制同时只能有一个线程可以获取锁去添加元素,其他线程必需期待。别的notEmpty和notFull用来实现入队和出队的同步。 别的由于进出队是两个非公正独有锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个出产者-消费者模子。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
/* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);
public static final int MAX_VALUE = 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化首尾节点 last = head = new Node<E>(null); }
如图默认行列容量为0x7fffffff;用户也可以本身指定容量。
三、必备基本
3.1 ReentrantLock
可以参考 https://www.atatech.org/articles/80539?flag_data_from=active
3.2 条件变量(Condition)
条件变量这里利用的是takeLock.newCondition()获取也就是说挪用ReentrantLock的要领获取的,那么可预见Condition利用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下
如图ConditionObject中两个node别离用来存放条件行列的首尾节点,条件行列就是挪用条件变量的await要领被阻塞后的节点构成的单向链表。别的ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。
public final long awaitNanos(long nanosTimeout) throws InterruptedException { //假如间断符号被配置了,则抛异常 if (Thread.interrupted()) throw new InterruptedException(); //添加当前线程节点到条件行列, Node node = addConditionWaiter(); //当前线程释放独有锁 int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } //挂起当前线程直到超时 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } //unpark后,当前线程从头获取锁,有大概获取不到被放到AQS的行列 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); } final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); //释放锁,假如失败则抛异常 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
首先假如当前线程间断符号被配置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件行列。
然后实验释放当前线程拥有的锁并生存当前计数,可知假如当前线程挪用awaitNano前没有利用当前条件变量地址的Reetenlock变量挪用lock可能lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。
然后挪用park挂起当前线程直到超时可能其他线程挪用了当前线程的unpark要领,可能挪用了当前线程的interupt要领(这时候会抛异常)。