自从开始了解java多线程后,我们总避免不了使用一些线程安全的数据结构,如什么LinkedBlockingQueue啦、PriorityBlockingQueue啦还有诸如ConcurrentHashMap等等。不知道你们注意到没有,这些数据结构中都有一个ReentrantLock的东西,而且在关键的数据同步之处都有它的lock和unlock这两个方法,那不难猜测,这玩意儿就是来实现锁这个操作的。

来看一下ReentrantLock的简介:

  • 他是一个独占锁, 提供了像synchronized一样的隐式监视器锁的操作,但具有更强的扩展性

  • 在一个线程已经持有这个锁的时候,是可以重入的(允许嵌套的获取这个锁),但是别递归超过0x7fffffff次

  • 建议在final块中释放锁,这是为了保证在收到异常后也能正确释放锁

那下一步我们就要看看它到底是怎么实现lock操作的:

 1 public class ReentrantLock implements Lock, java.io.Serializable {
 2     private static final long serialVersionUID = 7373984872572414699L;
 3     
 4     /** Synchronizer providing all implementation mechanics */
 5     private final Sync sync;
 6     
 7     /**
 8      * Creates an instance of {@code ReentrantLock} with the
 9      * given fairness policy.
10      *
11      * @param fair {@code true} if this lock should use a fair ordering policy
12      */
13     public ReentrantLock(boolean fair) {
14         sync = fair ? new FairSync() : new NonfairSync();
15     }
16 
17     public void lock() {
18         sync.lock();
19     }
20 }

喔嚯,lock这么简单,只是调用了sync.lock(), 我们先拿公平锁来举例,去看看FairSync是什么鬼

 1 /**
 2  * Sync object for fair locks
 3  */
 4 static final class FairSync extends Sync {
 5     private static final long serialVersionUID = -3000897897090466540L;
 6 
 7     final void lock() {
 8         acquire(1);
 9     }
10 }

恩? 这又调用了acquire(1), 这个方法来自于FairSync的父类(Sync)的父类, 也就是ReentrantLock的幕后黑手:

AbstractQueuedSynchronizer

来来来,看看它的acquire():

 1 /**
 2  * Acquires in exclusive mode, ignoring interrupts.  Implemented
 3  * by invoking at least once {@link #tryAcquire},
 4  * returning on success.  Otherwise the thread is queued, possibly
 5  * repeatedly blocking and unblocking, invoking {@link
 6  * #tryAcquire} until success.  This method can be used
 7  * to implement method {@link Lock#lock}.
 8  *
 9  * @param arg the acquire argument.  This value is conveyed to
10  *        {@link #tryAcquire} but is otherwise uninterpreted and
11  *        can represent anything you like.
12  */
13 public final void acquire(int arg) {
14     if (!tryAcquire(arg) &&
15         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16         selfInterrupt();
17 }

哈,这个if里面又去调用了tryAcquire, 而这玩意儿其实是在FairSync里面实现的:

 1 /**
 2  * Fair version of tryAcquire.  Don't grant access unless
 3  * recursive call or no waiters or is first.
 4  */
 5 protected final boolean tryAcquire(int acquires) {
 6     final Thread current = Thread.currentThread();
 7     int c = getState();
 8     if (c == 0) {
 9         if (!hasQueuedPredecessors() &&
10             compareAndSetState(0, acquires)) {
11             setExclusiveOwnerThread(current);
12             return true;
13         }
14     }
15     else if (current == getExclusiveOwnerThread()) {
16         int nextc = c + acquires;
17         if (nextc < 0)
18             throw new Error("Maximum lock count exceeded");
19         setState(nextc);
20         return true;
21     }
22     return false;
23 }
  1. 获取当前的线程, 并得到当前的状态,默认就是0
  2. 当state为0时, 如果没有排队的前驱,并且我们熟悉的CAS对state这块内存操作成功的话,就将当前线程设为独占那个锁线程,表明获得锁成功,反之返回失败
  3. 如果状态不是0, 但是当前线程就是之前设置过的独占线程, 将状态+acquires(也就是加1),代表重入次数,返回成功

我们可以看到,它本质上还是用CAS原子操作来获得这个锁的,并且state代表的就是重入的次数,不过这个QueuedPredecessors是什么呢?aquire里面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))怎么也碰到了这个queue,到底是什么queue呢?

 1 /**
 2  * Wait queue node class.
 3  *
 4  * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
 5  * Hagersten) lock queue. CLH locks are normally used for
 6  * spinlocks.  We instead use them for blocking synchronizers, but
 7  * use the same basic tactic of holding some of the control
 8  * information about a thread in the predecessor of its node.  A
 9  * "status" field in each node keeps track of whether a thread
10  * should block.  A node is signalled when its predecessor
11  * releases.  Each node of the queue otherwise serves as a
12  * specific-notification-style monitor holding a single waiting
13  * thread. The status field does NOT control whether threads are
14  * granted locks etc though.  A thread may try to acquire if it is
15  * first in the queue. But being first does not guarantee success;
16  * it only gives the right to contend.  So the currently released
17  * contender thread may need to rewait.
18  *
19  * <p>To enqueue into a CLH lock, you atomically splice it in as new
20  * tail. To dequeue, you just set the head field.
21  * <pre>
22  *      +------+  prev +-----+       +-----+
23  * head |      | <---- |     | <---- |     |  tail
24  *      +------+       +-----+       +-----+
25  * </pre>
26  *
27  * <p>Insertion into a CLH queue requires only a single atomic
28  * operation on "tail", so there is a simple atomic point of
29  * demarcation from unqueued to queued. Similarly, dequeing
30  * involves only updating the "head". However, it takes a bit
31  * more work for nodes to determine who their successors are,
32  * in part to deal with possible cancellation due to timeouts
33  * and interrupts.
34  *
35  * <p>The "prev" links (not used in original CLH locks), are mainly
36  * needed to handle cancellation. If a node is cancelled, its
37  * successor is (normally) relinked to a non-cancelled
38  * predecessor. For explanation of similar mechanics in the case
39  * of spin locks, see the papers by Scott and Scherer at
40  * http://www.cs.rochester.edu/u/scott/synchronization/
41  *
42  * <p>We also use "next" links to implement blocking mechanics.
43  * The thread id for each node is kept in its own node, so a
44  * predecessor signals the next node to wake up by traversing
45  * next link to determine which thread it is.  Determination of
46  * successor must avoid races with newly queued nodes to set
47  * the "next" fields of their predecessors.  This is solved
48  * when necessary by checking backwards from the atomically
49  * updated "tail" when a node's successor appears to be null.
50  * (Or, said differently, the next-links are an optimization
51  * so that we don't usually need a backward scan.)
52  *
53  * <p>Cancellation introduces some conservatism to the basic
54  * algorithms.  Since we must poll for cancellation of other
55  * nodes, we can miss noticing whether a cancelled node is
56  * ahead or behind us. This is dealt with by always unparking
57  * successors upon cancellation, allowing them to stabilize on
58  * a new predecessor, unless we can identify an uncancelled
59  * predecessor who will carry this responsibility.
60  *
61  * <p>CLH queues need a dummy header node to get started. But
62  * we don't create them on construction, because it would be wasted
63  * effort if there is never contention. Instead, the node
64  * is constructed and head and tail pointers are set upon first
65  * contention.
66  *
67  * <p>Threads waiting on Conditions use the same nodes, but
68  * use an additional link. Conditions only need to link nodes
69  * in simple (non-concurrent) linked queues because they are
70  * only accessed when exclusively held.  Upon await, a node is
71  * inserted into a condition queue.  Upon signal, the node is
72  * transferred to the main queue.  A special value of status
73  * field is used to mark which queue a node is on.
74  *
75  * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
76  * Scherer and Michael Scott, along with members of JSR-166
77  * expert group, for helpful ideas, discussions, and critiques
78  * on the design of this class.
79  */

好长一段,大致就是维护一个双向的链表(里面叫成CLH queue),里面的每一个node维护了一个thread以及它的状态,CLH的头节点的thread是锁的持有者,一旦head获取锁后,head的后继就会被唤醒,不断的去尝试获得锁(因为head随时会释放锁),需要获得锁的其他thread就会被包装成node放入CLH的尾部进行排队并休眠。这其中Conditions还维护了另一条queue,我们过会儿再说。

我们来看看hasQueuedPrdecessors()

 1 /* @return {@code true} if there is a queued thread preceding the
 2  *         current thread, and {@code false} if the current thread
 3  *         is at the head of the queue or the queue is empty
 4  * @since 1.7
 5  */
 6 public final boolean hasQueuedPredecessors() {
 7     // The correctness of this depends on head being initialized
 8     // before tail and on head.next being accurate if the current
 9     // thread is first in queue.
10     Node t = tail; // Read fields in reverse initialization order
11     Node h = head;
12     Node s;
13     return h != t &&
14         ((s = h.next) == null || s.thread != Thread.currentThread());
15 }

来分析一下几种情况:

  1. h == t,CLH要么就是有唯一的一个thread获得锁,要么CLH为空,干脆根本有没thread获得锁,那就返回false(只有在有thread等待时才会创建CLH,并且加入一个dummyHead,真正等待的是head.next)

  2. 队列的head.next.thread != currentThread, 说明有别的线程先于自己在等待,那么返回true

当queue为空或者是只有head一个node又或者当前线程的node就是head的next时,返回false,说明这三种情况可以调用CAS去尝试获得锁(理解不了就使劲去理解吧)

这下我们看懂了tryAcquire,不过要是tryAcquire失败了呢?回到acquire的if中,接下来有个acquireQueued(addWaiter(Node.EXCLUSIVE), arg)), 先看看addWaiter:

 1 /**
 2  * Creates and enqueues node for current thread and given mode.
 3  *
 4  * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 5  * @return the new node
 6  */
 7 private Node addWaiter(Node mode) {
 8     Node node = new Node(Thread.currentThread(), mode);
 9     // Try the fast path of enq; backup to full enq on failure
10     Node pred = tail;
11     if (pred != null) {
12         node.prev = pred;
13         if (compareAndSetTail(pred, node)) {
14             pred.next = node;
15             return node;
16         }
17     }
18     enq(node);
19     return node;
20 }
21 
22 
23 /**
24  * Inserts node into queue, initializing if necessary. See picture above.
25  * @param node the node to insert
26  * @return node's predecessor
27  */
28 private Node enq(final Node node) {
29     for (;;) {
30         Node t = tail;
31         if (t == null) { // Must initialize
32             if (compareAndSetHead(new Node()))
33                 tail = head;
34         } else {
35             node.prev = t;
36             if (compareAndSetTail(t, node)) {
37                 t.next = node;
38                 return t;
39             }
40         }
41     }
42 }

看起来就是新建一个node,然后通过我们熟悉的链表操作将它放到队尾去并更新tail,只不过对这些node的操作一样也是由native保证atomiclly的
不过光是放到队尾就好了吗?这样想就太naive了,来看acquireQueued:

 1 /**
 2  * Acquires in exclusive uninterruptible mode for thread already in
 3  * queue. Used by condition wait methods as well as acquire.
 4  *
 5  * @param node the node
 6  * @param arg the acquire argument
 7  * @return {@code true} if interrupted while waiting
 8  */
 9 final boolean acquireQueued(final Node node, int arg) {
10     boolean failed = true;
11     try {
12         boolean interrupted = false;
13         for (;;) {
14             final Node p = node.predecessor();
15             if (p == head && tryAcquire(arg)) {
16                 setHead(node);
17                 p.next = null; // help GC
18                 failed = false;
19                 return interrupted;
20             }
21             if (shouldParkAfterFailedAcquire(p, node) &&
22                 parkAndCheckInterrupt())
23                 interrupted = true;
24         }
25     } finally {
26         if (failed)
27             cancelAcquire(node);
28     }
29 }
  • 首先要清楚的是,这些代码都是在想要获得锁的这一个线程中完成的
  • 在循环中,如果当前node是head的直接next,还是会去tryAcquire,因为head随时会释放锁,如果成功就返回
  • 如果需要park的话(即关闭对此线程的调度,也就是进入休眠,指导有别的线程unpark它),就进入了休眠态,直到被唤醒再重新循环
  • 想一下,下次唤醒的是什么时候? 是不是当头节点unlock的时候?我们来看看ReentrantLock的unlock干了什么:
 1 // ReentrantLock
 2 public void unlock() {
 3     sync.release(1);
 4 }
 5 
 6 // AbstractQueuedSynchronizer
 7 public final boolean release(int arg) {
 8     if (tryRelease(arg)) {
 9         Node h = head;
10         if (h != null && h.waitStatus != 0)
11             unparkSuccessor(h);
12         return true;
13     }
14     return false;
15 }

实质上是调用了sync的release, 其中的tryRelease又在Sync实现:

 1 // Sync
 2 protected final boolean tryRelease(int releases) {
 3     int c = getState() - releases;
 4     if (Thread.currentThread() != getExclusiveOwnerThread())
 5         throw new IllegalMonitorStateException();
 6     boolean free = false;
 7     if (c == 0) {
 8         free = true;
 9         setExclusiveOwnerThread(null);
10     }
11     setState(c);
12     return free;
13 }

仅仅是将state-1这么简单, 当所有重入锁被释放完之后,state为0,这时候别的线程已经可以CAS成功了(这时候不需要原子操作,因为到目前为止都是独占的),返回true,会进入最关键的unparkSuccessor:

 1 // AbstractQueuedSynchronizer
 2 /**
 3  * Wakes up node's successor, if one exists.
 4  *
 5  * @param node the node
 6  */
 7 private void unparkSuccessor(Node node) {
 8     /*
 9      * If status is negative (i.e., possibly needing signal) try
10      * to clear in anticipation of signalling.  It is OK if this
11      * fails or if status is changed by waiting thread.
12      */
13     int ws = node.waitStatus;
14     if (ws < 0)
15         compareAndSetWaitStatus(node, ws, 0);
16 
17     /*
18      * Thread to unpark is held in successor, which is normally
19      * just the next node.  But if cancelled or apparently null,
20      * traverse backwards from tail to find the actual
21      * non-cancelled successor.
22      */
23     Node s = node.next;
24     if (s == null || s.waitStatus > 0) {
25         s = null;
26         for (Node t = tail; t != null && t != node; t = t.prev)
27             if (t.waitStatus <= 0)	
28                 s = t;
29     }
30     if (s != null)
31         LockSupport.unpark(s.thread);
32 }

这下clear了, 确实是在释放头节点的时候,会唤醒它的后继节点(我们先不考虑cancel掉的), 这时候,被唤醒的thread就会执行之前acquireQueued里面剩下的循环了。 有没有感觉仁督二脉被打通了~

现在来看非公平锁NonfairSync就很清楚了:

 1 /**
 2  * Sync object for non-fair locks
 3  */
 4 static final class NonfairSync extends Sync {
 5     private static final long serialVersionUID = 7316153563782823691L;
 6 
 7     /**
 8      * Performs lock.  Try immediate barge, backing up to normal
 9      * acquire on failure.
10      */
11     final void lock() {
12         if (compareAndSetState(0, 1))
13             setExclusiveOwnerThread(Thread.currentThread());
14         else
15             acquire(1);
16     }
17 
18     protected final boolean tryAcquire(int acquires) {
19         return nonfairTryAcquire(acquires);
20     }
21 }
22 
23 // Sync
24 /**
25  * Performs non-fair tryLock.  tryAcquire is
26  * implemented in subclasses, but both need nonfair
27  * try for trylock method.
28  */
29 final boolean nonfairTryAcquire(int acquires) {
30     final Thread current = Thread.currentThread();
31     int c = getState();
32     if (c == 0) {
33         if (compareAndSetState(0, acquires)) {
34             setExclusiveOwnerThread(current);
35             return true;
36         }
37     }
38     else if (current == getExclusiveOwnerThread()) {
39         int nextc = c + acquires;
40         if (nextc < 0) // overflow
41             throw new Error("Maximum lock count exceeded");
42         setState(nextc);
43         return true;
44     }
45     return false;
46 }
  • lock方法以上来先会CAS一次,这就体现了非公平的特性,哥不排队,哥抢了再说,如果抢到的话,也不用操作CLH queue,因为head是在acquireQueued里面释放的,这就等到release再做就行了
  • nofairTryAcquire跟tryAcquire的唯一区别就是不需要判断hasQueuedPredecessors(),上面已经解释过它是什么了
  • 完美~

还有什么acquireInterruptibly只不过是检查一下中断标志位再抛出一个InterruptException罢了

至此,我们已经知道了ReentrantLock是如何上锁和解锁的

Condition

要实现lock说到底只能实现互斥,那多线程之间的数据同步怎么办? 用synchronized的时候我们可以用object.wait()/notifAll(), 在ReentrantLock里就要祭出Condition啦
我们先来看看刚才没有提到的node的waitStatues:

 1 /** waitStatus value to indicate thread has cancelled */
 2 static final int CANCELLED =  1;
 3 
 4 /** waitStatus value to indicate successor's thread needs unparking */
 5 static final int SIGNAL    = -1;
 6 
 7 /** waitStatus value to indicate thread is waiting on condition */
 8 static final int CONDITION = -2;
 9 
10 /**
11  * waitStatus value to indicate the next acquireShared should
12  * unconditionally propagate
13  */
14 static final int PROPAGATE = -3;
15 
16 /**
17  * Status field, taking on only the values:
18  *   SIGNAL:     The successor of this node is (or will soon be)
19  *               blocked (via park), so the current node must
20  *               unpark its successor when it releases or
21  *               cancels. To avoid races, acquire methods must
22  *               first indicate they need a signal,
23  *               then retry the atomic acquire, and then,
24  *               on failure, block.
25  *   CANCELLED:  This node is cancelled due to timeout or interrupt.
26  *               Nodes never leave this state. In particular,
27  *               a thread with cancelled node never again blocks.
28  *   CONDITION:  This node is currently on a condition queue.
29  *               It will not be used as a sync queue node
30  *               until transferred, at which time the status
31  *               will be set to 0. (Use of this value here has
32  *               nothing to do with the other uses of the
33  *               field, but simplifies mechanics.)
34  *   PROPAGATE:  A releaseShared should be propagated to other
35  *               nodes. This is set (for head node only) in
36  *               doReleaseShared to ensure propagation
37  *               continues, even if other operations have
38  *               since intervened.
39  *   0:          None of the above  
40  *
41  * The values are arranged numerically to simplify use.
42  * Non-negative values mean that a node doesn't need to
43  * signal. So, most code doesn't need to check for particular
44  * values, just for sign.
45  *
46  * The field is initialized to 0 for normal sync nodes, and
47  * CONDITION for condition nodes.  It is modified using CAS
48  * (or when possible, unconditional volatile writes).
49  */
50 volatile int waitStatus;

主要来解释两个参数:

SIGNAL:当前node的next需要被唤醒,因为随时可能release  
CONDITION:当前node在condition队列中,当它被转移到同步队列中,会把状态设为0  

然后再来看看await:

 1 /**
 2  * Implements interruptible condition wait.
 3  * <ol>
 4  * <li> If current thread is interrupted, throw InterruptedException.
 5  * <li> Save lock state returned by {@link #getState}.
 6  * <li> Invoke {@link #release} with
 7  *      saved state as argument, throwing
 8  *      IllegalMonitorStateException if it fails.
 9  * <li> Block until signalled or interrupted.
10  * <li> Reacquire by invoking specialized version of
11  *      {@link #acquire} with saved state as argument.
12  * <li> If interrupted while blocked in step 4, throw InterruptedException.
13  * </ol>
14  */
15 public final void await() throws InterruptedException {
16     if (Thread.interrupted())
17         throw new InterruptedException();
18     Node node = addConditionWaiter();
19     int savedState = fullyRelease(node);
20     int interruptMode = 0;
21     while (!isOnSyncQueue(node)) {
22         LockSupport.park(this);
23         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
24             break;
25     }
26     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
27         interruptMode = REINTERRUPT;
28     if (node.nextWaiter != null) // clean up if cancelled
29         unlinkCancelledWaiters();
30     if (interruptMode != 0)
31         reportInterruptAfterWait(interruptMode);
32 }
  • 在18行,会创建一个node,状态会标记为Node.CONDITION,并且加入到一个waiter队列中(也是链表实现的,就是链表的操作,就不说了),并且这个waiter队列与CLH没有半毛钱关系
  • 19行fullyRelease会完全放弃所有的重入锁
  • 21行不断检查当前的node是否在SyncQueue(也就是之前一直说的CLH queue中),如果不在,就会park阻塞。要明确的是,此时虽然thread属于head并且也没有将head移出CLH(因为只有在next node获得锁的时候才会重设head),但是isOnSyncQueue会优先判断是不是CONDITION状态,所以它一直返回false,直到!!!留个悬念
  • 当在CLH队列中时,重新去acquireQueue,这就跟之前是一样的了

是不是还有点糊涂,没事,看完signalAll你就知道了 它最终会调用doSignalAll:

 1 /**
 2  * Removes and transfers all nodes.
 3  * @param first (non-null) the first node on condition queue
 4  */
 5 private void doSignalAll(Node first) {
 6     lastWaiter = firstWaiter = null;
 7     do {
 8         Node next = first.nextWaiter;
 9         first.nextWaiter = null;
10         transferForSignal(first);
11         first = next;
12     } while (first != null);
13 }

doSignalAll里遍历waiter队列里的每个node,调用transferForSignal:

 1 /**
 2  * Transfers a node from a condition queue onto sync queue.
 3  * Returns true if successful.
 4  * @param node the node
 5  * @return true if successfully transferred (else the node was
 6  * cancelled before signal).
 7  */
 8 final boolean transferForSignal(Node node) {
 9     /*
10      * If cannot change waitStatus, the node has been cancelled.
11      */
12     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
13         return false;
14 
15     /*
16      * Splice onto queue and try to set waitStatus of predecessor to
17      * indicate that thread is (probably) waiting. If cancelled or
18      * attempt to set waitStatus fails, wake up to resync (in which
19      * case the waitStatus can be transiently and harmlessly wrong).
20      */
21     Node p = enq(node);
22     int ws = p.waitStatus;
23     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
24         LockSupport.unpark(node.thread);
25     return true;
26 }

哈哈,这里不就调用了enq吗,还记得它吗?就是把node加到CLH的队尾啦,这样就可以放入真正的抢占锁的队列中了
不过有一点要注意的是,23行会设置当前节点的前驱状态为Node.SIGNAL,如果失败的话,还要主动去把这个node给unpark,这是为什么呢
因为这个node现在其实是阻塞在await的循环中,并没有走到acquireQueued中,所以要将它唤醒,跳出!isOnSyncQueue(node)循环才行

好了,这下所有问题都解决了,这样看来各个Condition之间有单独的队列互相不影响,等到signal的时候才会将等待node加入CLH中,就是那么简单

总结也是一两句话说不清的,总之理解了就是理解了,理解不了您就再多琢磨琢磨吧,其实并没有很复杂~