ReentrantLock的幕后黑手:AbstractQueuedSynchronizer
自从开始了解java多线程后,我们总避免不了使用一些线程安全的数据结构,如什么LinkedBlockingQueue啦、PriorityBlockingQueue啦还有诸如ConcurrentHashMap等等。不知道你们注意到没有,这些数据结构中都有一个ReentrantLock的东西,而且在关键的数据同步之处都有它的lock和unlock这两个方法,那不难猜测,这玩意儿就是来实现锁这个操作的。
来看一下ReentrantLock的简介:
-
他是一个独占锁, 提供了像synchronized一样的隐式监视器锁的操作,但具有更强的扩展性
-
在一个线程已经持有这个锁的时候,是可以重入的(允许嵌套的获取这个锁),但是别递归超过0x7fffffff次
-
建议在final块中释放锁,这是为了保证在收到异常后也能正确释放锁
那下一步我们就要看看它到底是怎么实现lock操作的:
1 public class ReentrantLock implements Lock, java.io.Serializable {
2 private static final long serialVersionUID = 7373984872572414699L;
3
4 /** Synchronizer providing all implementation mechanics */
5 private final Sync sync;
6
7 /**
8 * Creates an instance of {@code ReentrantLock} with the
9 * given fairness policy.
10 *
11 * @param fair {@code true} if this lock should use a fair ordering policy
12 */
13 public ReentrantLock(boolean fair) {
14 sync = fair ? new FairSync() : new NonfairSync();
15 }
16
17 public void lock() {
18 sync.lock();
19 }
20 }
喔嚯,lock这么简单,只是调用了sync.lock(), 我们先拿公平锁来举例,去看看FairSync是什么鬼
1 /**
2 * Sync object for fair locks
3 */
4 static final class FairSync extends Sync {
5 private static final long serialVersionUID = -3000897897090466540L;
6
7 final void lock() {
8 acquire(1);
9 }
10 }
恩? 这又调用了acquire(1), 这个方法来自于FairSync的父类(Sync)的父类, 也就是ReentrantLock的幕后黑手:
AbstractQueuedSynchronizer
来来来,看看它的acquire():
1 /**
2 * Acquires in exclusive mode, ignoring interrupts. Implemented
3 * by invoking at least once {@link #tryAcquire},
4 * returning on success. Otherwise the thread is queued, possibly
5 * repeatedly blocking and unblocking, invoking {@link
6 * #tryAcquire} until success. This method can be used
7 * to implement method {@link Lock#lock}.
8 *
9 * @param arg the acquire argument. This value is conveyed to
10 * {@link #tryAcquire} but is otherwise uninterpreted and
11 * can represent anything you like.
12 */
13 public final void acquire(int arg) {
14 if (!tryAcquire(arg) &&
15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16 selfInterrupt();
17 }
哈,这个if里面又去调用了tryAcquire, 而这玩意儿其实是在FairSync里面实现的:
1 /**
2 * Fair version of tryAcquire. Don't grant access unless
3 * recursive call or no waiters or is first.
4 */
5 protected final boolean tryAcquire(int acquires) {
6 final Thread current = Thread.currentThread();
7 int c = getState();
8 if (c == 0) {
9 if (!hasQueuedPredecessors() &&
10 compareAndSetState(0, acquires)) {
11 setExclusiveOwnerThread(current);
12 return true;
13 }
14 }
15 else if (current == getExclusiveOwnerThread()) {
16 int nextc = c + acquires;
17 if (nextc < 0)
18 throw new Error("Maximum lock count exceeded");
19 setState(nextc);
20 return true;
21 }
22 return false;
23 }
- 获取当前的线程, 并得到当前的状态,默认就是0
- 当state为0时, 如果没有排队的前驱,并且我们熟悉的CAS对state这块内存操作成功的话,就将当前线程设为独占那个锁线程,表明获得锁成功,反之返回失败
- 如果状态不是0, 但是当前线程就是之前设置过的独占线程, 将状态+acquires(也就是加1),代表重入次数,返回成功
我们可以看到,它本质上还是用CAS原子操作来获得这个锁的,并且state代表的就是重入的次数,不过这个QueuedPredecessors是什么呢?aquire里面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))怎么也碰到了这个queue,到底是什么queue呢?
1 /**
2 * Wait queue node class.
3 *
4 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
5 * Hagersten) lock queue. CLH locks are normally used for
6 * spinlocks. We instead use them for blocking synchronizers, but
7 * use the same basic tactic of holding some of the control
8 * information about a thread in the predecessor of its node. A
9 * "status" field in each node keeps track of whether a thread
10 * should block. A node is signalled when its predecessor
11 * releases. Each node of the queue otherwise serves as a
12 * specific-notification-style monitor holding a single waiting
13 * thread. The status field does NOT control whether threads are
14 * granted locks etc though. A thread may try to acquire if it is
15 * first in the queue. But being first does not guarantee success;
16 * it only gives the right to contend. So the currently released
17 * contender thread may need to rewait.
18 *
19 * <p>To enqueue into a CLH lock, you atomically splice it in as new
20 * tail. To dequeue, you just set the head field.
21 * <pre>
22 * +------+ prev +-----+ +-----+
23 * head | | <---- | | <---- | | tail
24 * +------+ +-----+ +-----+
25 * </pre>
26 *
27 * <p>Insertion into a CLH queue requires only a single atomic
28 * operation on "tail", so there is a simple atomic point of
29 * demarcation from unqueued to queued. Similarly, dequeing
30 * involves only updating the "head". However, it takes a bit
31 * more work for nodes to determine who their successors are,
32 * in part to deal with possible cancellation due to timeouts
33 * and interrupts.
34 *
35 * <p>The "prev" links (not used in original CLH locks), are mainly
36 * needed to handle cancellation. If a node is cancelled, its
37 * successor is (normally) relinked to a non-cancelled
38 * predecessor. For explanation of similar mechanics in the case
39 * of spin locks, see the papers by Scott and Scherer at
40 * http://www.cs.rochester.edu/u/scott/synchronization/
41 *
42 * <p>We also use "next" links to implement blocking mechanics.
43 * The thread id for each node is kept in its own node, so a
44 * predecessor signals the next node to wake up by traversing
45 * next link to determine which thread it is. Determination of
46 * successor must avoid races with newly queued nodes to set
47 * the "next" fields of their predecessors. This is solved
48 * when necessary by checking backwards from the atomically
49 * updated "tail" when a node's successor appears to be null.
50 * (Or, said differently, the next-links are an optimization
51 * so that we don't usually need a backward scan.)
52 *
53 * <p>Cancellation introduces some conservatism to the basic
54 * algorithms. Since we must poll for cancellation of other
55 * nodes, we can miss noticing whether a cancelled node is
56 * ahead or behind us. This is dealt with by always unparking
57 * successors upon cancellation, allowing them to stabilize on
58 * a new predecessor, unless we can identify an uncancelled
59 * predecessor who will carry this responsibility.
60 *
61 * <p>CLH queues need a dummy header node to get started. But
62 * we don't create them on construction, because it would be wasted
63 * effort if there is never contention. Instead, the node
64 * is constructed and head and tail pointers are set upon first
65 * contention.
66 *
67 * <p>Threads waiting on Conditions use the same nodes, but
68 * use an additional link. Conditions only need to link nodes
69 * in simple (non-concurrent) linked queues because they are
70 * only accessed when exclusively held. Upon await, a node is
71 * inserted into a condition queue. Upon signal, the node is
72 * transferred to the main queue. A special value of status
73 * field is used to mark which queue a node is on.
74 *
75 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
76 * Scherer and Michael Scott, along with members of JSR-166
77 * expert group, for helpful ideas, discussions, and critiques
78 * on the design of this class.
79 */
好长一段,大致就是维护一个双向的链表(里面叫成CLH queue),里面的每一个node维护了一个thread以及它的状态,CLH的头节点的thread是锁的持有者,一旦head获取锁后,head的后继就会被唤醒,不断的去尝试获得锁(因为head随时会释放锁),需要获得锁的其他thread就会被包装成node放入CLH的尾部进行排队并休眠。这其中Conditions还维护了另一条queue,我们过会儿再说。
我们来看看hasQueuedPrdecessors()
1 /* @return {@code true} if there is a queued thread preceding the
2 * current thread, and {@code false} if the current thread
3 * is at the head of the queue or the queue is empty
4 * @since 1.7
5 */
6 public final boolean hasQueuedPredecessors() {
7 // The correctness of this depends on head being initialized
8 // before tail and on head.next being accurate if the current
9 // thread is first in queue.
10 Node t = tail; // Read fields in reverse initialization order
11 Node h = head;
12 Node s;
13 return h != t &&
14 ((s = h.next) == null || s.thread != Thread.currentThread());
15 }
来分析一下几种情况:
-
h == t,CLH要么就是有唯一的一个thread获得锁,要么CLH为空,干脆根本有没thread获得锁,那就返回false(只有在有thread等待时才会创建CLH,并且加入一个dummyHead,真正等待的是head.next)
-
队列的head.next.thread != currentThread, 说明有别的线程先于自己在等待,那么返回true
当queue为空或者是只有head一个node又或者当前线程的node就是head的next时,返回false,说明这三种情况可以调用CAS去尝试获得锁(理解不了就使劲去理解吧)
这下我们看懂了tryAcquire,不过要是tryAcquire失败了呢?回到acquire的if中,接下来有个acquireQueued(addWaiter(Node.EXCLUSIVE), arg)), 先看看addWaiter:
1 /**
2 * Creates and enqueues node for current thread and given mode.
3 *
4 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
5 * @return the new node
6 */
7 private Node addWaiter(Node mode) {
8 Node node = new Node(Thread.currentThread(), mode);
9 // Try the fast path of enq; backup to full enq on failure
10 Node pred = tail;
11 if (pred != null) {
12 node.prev = pred;
13 if (compareAndSetTail(pred, node)) {
14 pred.next = node;
15 return node;
16 }
17 }
18 enq(node);
19 return node;
20 }
21
22
23 /**
24 * Inserts node into queue, initializing if necessary. See picture above.
25 * @param node the node to insert
26 * @return node's predecessor
27 */
28 private Node enq(final Node node) {
29 for (;;) {
30 Node t = tail;
31 if (t == null) { // Must initialize
32 if (compareAndSetHead(new Node()))
33 tail = head;
34 } else {
35 node.prev = t;
36 if (compareAndSetTail(t, node)) {
37 t.next = node;
38 return t;
39 }
40 }
41 }
42 }
看起来就是新建一个node,然后通过我们熟悉的链表操作将它放到队尾去并更新tail,只不过对这些node的操作一样也是由native保证atomiclly的
不过光是放到队尾就好了吗?这样想就太naive了,来看acquireQueued:
1 /**
2 * Acquires in exclusive uninterruptible mode for thread already in
3 * queue. Used by condition wait methods as well as acquire.
4 *
5 * @param node the node
6 * @param arg the acquire argument
7 * @return {@code true} if interrupted while waiting
8 */
9 final boolean acquireQueued(final Node node, int arg) {
10 boolean failed = true;
11 try {
12 boolean interrupted = false;
13 for (;;) {
14 final Node p = node.predecessor();
15 if (p == head && tryAcquire(arg)) {
16 setHead(node);
17 p.next = null; // help GC
18 failed = false;
19 return interrupted;
20 }
21 if (shouldParkAfterFailedAcquire(p, node) &&
22 parkAndCheckInterrupt())
23 interrupted = true;
24 }
25 } finally {
26 if (failed)
27 cancelAcquire(node);
28 }
29 }
- 首先要清楚的是,这些代码都是在想要获得锁的这一个线程中完成的
- 在循环中,如果当前node是head的直接next,还是会去tryAcquire,因为head随时会释放锁,如果成功就返回
- 如果需要park的话(即关闭对此线程的调度,也就是进入休眠,指导有别的线程unpark它),就进入了休眠态,直到被唤醒再重新循环
- 想一下,下次唤醒的是什么时候? 是不是当头节点unlock的时候?我们来看看ReentrantLock的unlock干了什么:
1 // ReentrantLock
2 public void unlock() {
3 sync.release(1);
4 }
5
6 // AbstractQueuedSynchronizer
7 public final boolean release(int arg) {
8 if (tryRelease(arg)) {
9 Node h = head;
10 if (h != null && h.waitStatus != 0)
11 unparkSuccessor(h);
12 return true;
13 }
14 return false;
15 }
实质上是调用了sync的release, 其中的tryRelease又在Sync实现:
1 // Sync
2 protected final boolean tryRelease(int releases) {
3 int c = getState() - releases;
4 if (Thread.currentThread() != getExclusiveOwnerThread())
5 throw new IllegalMonitorStateException();
6 boolean free = false;
7 if (c == 0) {
8 free = true;
9 setExclusiveOwnerThread(null);
10 }
11 setState(c);
12 return free;
13 }
仅仅是将state-1这么简单, 当所有重入锁被释放完之后,state为0,这时候别的线程已经可以CAS成功了(这时候不需要原子操作,因为到目前为止都是独占的),返回true,会进入最关键的unparkSuccessor:
1 // AbstractQueuedSynchronizer
2 /**
3 * Wakes up node's successor, if one exists.
4 *
5 * @param node the node
6 */
7 private void unparkSuccessor(Node node) {
8 /*
9 * If status is negative (i.e., possibly needing signal) try
10 * to clear in anticipation of signalling. It is OK if this
11 * fails or if status is changed by waiting thread.
12 */
13 int ws = node.waitStatus;
14 if (ws < 0)
15 compareAndSetWaitStatus(node, ws, 0);
16
17 /*
18 * Thread to unpark is held in successor, which is normally
19 * just the next node. But if cancelled or apparently null,
20 * traverse backwards from tail to find the actual
21 * non-cancelled successor.
22 */
23 Node s = node.next;
24 if (s == null || s.waitStatus > 0) {
25 s = null;
26 for (Node t = tail; t != null && t != node; t = t.prev)
27 if (t.waitStatus <= 0)
28 s = t;
29 }
30 if (s != null)
31 LockSupport.unpark(s.thread);
32 }
这下clear了, 确实是在释放头节点的时候,会唤醒它的后继节点(我们先不考虑cancel掉的), 这时候,被唤醒的thread就会执行之前acquireQueued里面剩下的循环了。 有没有感觉仁督二脉被打通了~
现在来看非公平锁NonfairSync就很清楚了:
1 /**
2 * Sync object for non-fair locks
3 */
4 static final class NonfairSync extends Sync {
5 private static final long serialVersionUID = 7316153563782823691L;
6
7 /**
8 * Performs lock. Try immediate barge, backing up to normal
9 * acquire on failure.
10 */
11 final void lock() {
12 if (compareAndSetState(0, 1))
13 setExclusiveOwnerThread(Thread.currentThread());
14 else
15 acquire(1);
16 }
17
18 protected final boolean tryAcquire(int acquires) {
19 return nonfairTryAcquire(acquires);
20 }
21 }
22
23 // Sync
24 /**
25 * Performs non-fair tryLock. tryAcquire is
26 * implemented in subclasses, but both need nonfair
27 * try for trylock method.
28 */
29 final boolean nonfairTryAcquire(int acquires) {
30 final Thread current = Thread.currentThread();
31 int c = getState();
32 if (c == 0) {
33 if (compareAndSetState(0, acquires)) {
34 setExclusiveOwnerThread(current);
35 return true;
36 }
37 }
38 else if (current == getExclusiveOwnerThread()) {
39 int nextc = c + acquires;
40 if (nextc < 0) // overflow
41 throw new Error("Maximum lock count exceeded");
42 setState(nextc);
43 return true;
44 }
45 return false;
46 }
- lock方法以上来先会CAS一次,这就体现了非公平的特性,哥不排队,哥抢了再说,如果抢到的话,也不用操作CLH queue,因为head是在acquireQueued里面释放的,这就等到release再做就行了
- nofairTryAcquire跟tryAcquire的唯一区别就是不需要判断hasQueuedPredecessors(),上面已经解释过它是什么了
- 完美~
还有什么acquireInterruptibly只不过是检查一下中断标志位再抛出一个InterruptException罢了
至此,我们已经知道了ReentrantLock是如何上锁和解锁的
Condition
要实现lock说到底只能实现互斥,那多线程之间的数据同步怎么办?
用synchronized的时候我们可以用object.wait()/notifAll(), 在ReentrantLock里就要祭出Condition啦
我们先来看看刚才没有提到的node的waitStatues:
1 /** waitStatus value to indicate thread has cancelled */
2 static final int CANCELLED = 1;
3
4 /** waitStatus value to indicate successor's thread needs unparking */
5 static final int SIGNAL = -1;
6
7 /** waitStatus value to indicate thread is waiting on condition */
8 static final int CONDITION = -2;
9
10 /**
11 * waitStatus value to indicate the next acquireShared should
12 * unconditionally propagate
13 */
14 static final int PROPAGATE = -3;
15
16 /**
17 * Status field, taking on only the values:
18 * SIGNAL: The successor of this node is (or will soon be)
19 * blocked (via park), so the current node must
20 * unpark its successor when it releases or
21 * cancels. To avoid races, acquire methods must
22 * first indicate they need a signal,
23 * then retry the atomic acquire, and then,
24 * on failure, block.
25 * CANCELLED: This node is cancelled due to timeout or interrupt.
26 * Nodes never leave this state. In particular,
27 * a thread with cancelled node never again blocks.
28 * CONDITION: This node is currently on a condition queue.
29 * It will not be used as a sync queue node
30 * until transferred, at which time the status
31 * will be set to 0. (Use of this value here has
32 * nothing to do with the other uses of the
33 * field, but simplifies mechanics.)
34 * PROPAGATE: A releaseShared should be propagated to other
35 * nodes. This is set (for head node only) in
36 * doReleaseShared to ensure propagation
37 * continues, even if other operations have
38 * since intervened.
39 * 0: None of the above
40 *
41 * The values are arranged numerically to simplify use.
42 * Non-negative values mean that a node doesn't need to
43 * signal. So, most code doesn't need to check for particular
44 * values, just for sign.
45 *
46 * The field is initialized to 0 for normal sync nodes, and
47 * CONDITION for condition nodes. It is modified using CAS
48 * (or when possible, unconditional volatile writes).
49 */
50 volatile int waitStatus;
主要来解释两个参数:
SIGNAL:当前node的next需要被唤醒,因为随时可能release
CONDITION:当前node在condition队列中,当它被转移到同步队列中,会把状态设为0
然后再来看看await:
1 /**
2 * Implements interruptible condition wait.
3 * <ol>
4 * <li> If current thread is interrupted, throw InterruptedException.
5 * <li> Save lock state returned by {@link #getState}.
6 * <li> Invoke {@link #release} with
7 * saved state as argument, throwing
8 * IllegalMonitorStateException if it fails.
9 * <li> Block until signalled or interrupted.
10 * <li> Reacquire by invoking specialized version of
11 * {@link #acquire} with saved state as argument.
12 * <li> If interrupted while blocked in step 4, throw InterruptedException.
13 * </ol>
14 */
15 public final void await() throws InterruptedException {
16 if (Thread.interrupted())
17 throw new InterruptedException();
18 Node node = addConditionWaiter();
19 int savedState = fullyRelease(node);
20 int interruptMode = 0;
21 while (!isOnSyncQueue(node)) {
22 LockSupport.park(this);
23 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
24 break;
25 }
26 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
27 interruptMode = REINTERRUPT;
28 if (node.nextWaiter != null) // clean up if cancelled
29 unlinkCancelledWaiters();
30 if (interruptMode != 0)
31 reportInterruptAfterWait(interruptMode);
32 }
- 在18行,会创建一个node,状态会标记为Node.CONDITION,并且加入到一个waiter队列中(也是链表实现的,就是链表的操作,就不说了),并且这个waiter队列与CLH没有半毛钱关系
- 19行fullyRelease会完全放弃所有的重入锁
- 21行不断检查当前的node是否在SyncQueue(也就是之前一直说的CLH queue中),如果不在,就会park阻塞。要明确的是,此时虽然thread属于head并且也没有将head移出CLH(因为只有在next node获得锁的时候才会重设head),但是isOnSyncQueue会优先判断是不是CONDITION状态,所以它一直返回false,直到!!!留个悬念
- 当在CLH队列中时,重新去acquireQueue,这就跟之前是一样的了
是不是还有点糊涂,没事,看完signalAll你就知道了 它最终会调用doSignalAll:
1 /**
2 * Removes and transfers all nodes.
3 * @param first (non-null) the first node on condition queue
4 */
5 private void doSignalAll(Node first) {
6 lastWaiter = firstWaiter = null;
7 do {
8 Node next = first.nextWaiter;
9 first.nextWaiter = null;
10 transferForSignal(first);
11 first = next;
12 } while (first != null);
13 }
doSignalAll里遍历waiter队列里的每个node,调用transferForSignal:
1 /**
2 * Transfers a node from a condition queue onto sync queue.
3 * Returns true if successful.
4 * @param node the node
5 * @return true if successfully transferred (else the node was
6 * cancelled before signal).
7 */
8 final boolean transferForSignal(Node node) {
9 /*
10 * If cannot change waitStatus, the node has been cancelled.
11 */
12 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
13 return false;
14
15 /*
16 * Splice onto queue and try to set waitStatus of predecessor to
17 * indicate that thread is (probably) waiting. If cancelled or
18 * attempt to set waitStatus fails, wake up to resync (in which
19 * case the waitStatus can be transiently and harmlessly wrong).
20 */
21 Node p = enq(node);
22 int ws = p.waitStatus;
23 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
24 LockSupport.unpark(node.thread);
25 return true;
26 }
哈哈,这里不就调用了enq吗,还记得它吗?就是把node加到CLH的队尾啦,这样就可以放入真正的抢占锁的队列中了
不过有一点要注意的是,23行会设置当前节点的前驱状态为Node.SIGNAL,如果失败的话,还要主动去把这个node给unpark,这是为什么呢
因为这个node现在其实是阻塞在await的循环中,并没有走到acquireQueued中,所以要将它唤醒,跳出!isOnSyncQueue(node)循环才行
好了,这下所有问题都解决了,这样看来各个Condition之间有单独的队列互相不影响,等到signal的时候才会将等待node加入CLH中,就是那么简单
总结也是一两句话说不清的,总之理解了就是理解了,理解不了您就再多琢磨琢磨吧,其实并没有很复杂~