共享锁:ReentrantReadWriteLock
上一篇文章讲了ReentrantLock,我们知道它是一种互斥锁 但是要知道,ReentrantLock的本质其实是AbstractQueuedSynchronizer,而它不仅可以用来实现互斥锁,还能来实现共享锁 什么是共享锁?比如说对一个文件进行操作,对于写操作必须要用到互斥锁来保证数据唯一性,但是当读的时候呢,如果允许同时并行多个读操作,那将大大提高效率,只要保证所有的读操作与写操作之间互斥就可以了,读操作之间是可以共享的。 幸运的是,java.concurrent包中已经有了这种锁的实现,今天我们就来看看它到底是怎么做的吧。
1 public class ReentrantReadWriteLock
2 implements ReadWriteLock, java.io.Serializable {
3 private static final long serialVersionUID = -6992448646407690164L;
4 /** Inner class providing readlock */
5 private final ReentrantReadWriteLock.ReadLock readerLock;
6 /** Inner class providing writelock */
7 private final ReentrantReadWriteLock.WriteLock writerLock;
8 /** Performs all synchronization mechanics */
9 final Sync sync;
10
11 /**
12 * Creates a new {@code ReentrantReadWriteLock} with
13 * the given fairness policy.
14 *
15 * @param fair {@code true} if this lock should use a fair ordering policy
16 */
17 public ReentrantReadWriteLock(boolean fair) {
18 sync = fair ? new FairSync() : new NonfairSync();
19 readerLock = new ReadLock(this);
20 writerLock = new WriteLock(this);
21 }
22 }
简单说一下,ReentrantReadWriteLock中有一个readrLock和一个writerLock,通过这它们可以实现读者写着之间的互斥以及共享操作 不过要注意,其实它们真正的实现都是那个熟悉的Sync。在ReentrantReadWriteLock中,有公平和非公平两种模式:在非公平模式下:当一个锁释放了,不能确定到底是writer等待线程获得写锁,还是readr等待线程们获得读锁;在公平模式下:一个锁释放了,选择等待最久的一个(组)线程获得锁,无论是writer线程还是reader线程们 这样看来,貌似其实整个ReentrantReadWriteLock里面只有一把锁,就是那个Sync! 下面开始揭秘,我们选择公平模式来分析,先看ReadLock的lock好了:
1 // ReadLock
2 /**
3 * Acquires the read lock.
4 *
5 * <p>Acquires the read lock if the write lock is not held by
6 * another thread and returns immediately.
7 *
8 * <p>If the write lock is held by another thread then
9 * the current thread becomes disabled for thread scheduling
10 * purposes and lies dormant until the read lock has been acquired.
11 */
12 public void lock() {
13 sync.acquireShared(1);
14 }
15
16 // AbstractQueuedSynchronizer
17 public final void acquireShared(int arg) {
18 if (tryAcquireShared(arg) < 0)
19 doAcquireShared(arg);
20 }
他会调用AbstractQueuedSynchronizer里的acquireShared,后者会先调用Sync里的tryAcquireShared:
1 // Sync in ReetrantReadWriteLock
2
3 protected final int tryAcquireShared(int unused) {
4 Thread current = Thread.currentThread();
5 int c = getState();
6 if (exclusiveCount(c) != 0 &&
7 getExclusiveOwnerThread() != current)
8 return -1;
9 int r = sharedCount(c);
10 if (!readerShouldBlock() &&
11 r < MAX_COUNT &&
12 compareAndSetState(c, c + SHARED_UNIT)) {
13 if (r == 0) {
14 firstReader = current;
15 firstReaderHoldCount = 1;
16 } else if (firstReader == current) {
17 firstReaderHoldCount++;
18 } else {
19 HoldCounter rh = cachedHoldCounter;
20 if (rh == null || rh.tid != current.getId())
21 cachedHoldCounter = rh = readHolds.get();
22 else if (rh.count == 0)
23 readHolds.set(rh);
24 rh.count++;
25 }
26 return 1;
27 }
28 return fullTryAcquireShared(current);
29 }
- 第3行的if先判断write重入锁的次数以及当前的独占线程是不是自己,若果都不满足,说明writer正在持有锁,返回负值表示失败
- 第7行会先判断读者是否应该被阻塞,至此,应该知道当前是readers获得了锁,因此就尝试通过CAS增加一个reader,最后只要用一个ThreadLocal类型的readHolds来记录当前线程的重入次数就行了
- 如果上面条件都不成立,会进入fullTryAcquireShared
不过我们先来看看readerShouldBlock做了什么: 非公平状态下会调用AbstractQueuedSynchronizer中的:
1 /**
2 * Returns {@code true} if the apparent first queued thread, if one
3 * exists, is waiting in exclusive mode. If this method returns
4 * {@code true}, and the current thread is attempting to acquire in
5 * shared mode (that is, this method is invoked from {@link
6 * #tryAcquireShared}) then it is guaranteed that the current thread
7 * is not the first queued thread. Used only as a heuristic in
8 * ReentrantReadWriteLock.
9 */
10 final boolean apparentlyFirstQueuedIsExclusive() {
11 Node h, s;
12 return (h = head) != null &&
13 (s = h.next) != null &&
14 !s.isShared() &&
15 s.thread != null;
16 }
意思就是CLH中有一个等待获取锁的node,并且它不是share模式的(这很重要),这就代表着它会考虑writer线程,不会让writer线程永远等下去,因此writer排在第一位(head是锁持有者,head.next是第一个等待者)的时候,会返回true,告诉当前线程,应该block,如果是个reader那就不用等了这就是非公平的含义 顺个便来看一下公平模式中是怎么做的:
1 static final class FairSync extends Sync {
2 private static final long serialVersionUID = -2274990926593161451L;
3 final boolean writerShouldBlock() {
4 return hasQueuedPredecessors();
5 }
6 final boolean readerShouldBlock() {
7 return hasQueuedPredecessors();
8 }
9 }
啊哈,就是看是否有一个node正在等待获得锁,它不考虑是否是share模式还是非share模式,这就是所谓的公平 回过头来再看fullTryAcquireShared,也就是有机会获得锁但是又无法获得锁的情况(无论是别的thread优先级比自己高,还是reader数量超MAX或者是CAS失败):
1 final int fullTryAcquireShared(Thread current) {
2 /*
3 * This code is in part redundant with that in
4 * tryAcquireShared but is simpler overall by not
5 * complicating tryAcquireShared with interactions between
6 * retries and lazily reading hold counts.
7 */
8 HoldCounter rh = null;
9 for (;;) {
10 int c = getState();
11 if (exclusiveCount(c) != 0) {
12 if (getExclusiveOwnerThread() != current)
13 return -1;
14 // else we hold the exclusive lock; blocking here
15 // would cause deadlock.
16 } else if (readerShouldBlock()) {
17 // Make sure we're not acquiring read lock reentrantly
18 if (firstReader == current) {
19 // assert firstReaderHoldCount > 0;
20 } else {
21 if (rh == null) {
22 rh = cachedHoldCounter;
23 if (rh == null || rh.tid != current.getId()) {
24 rh = readHolds.get();
25 if (rh.count == 0)
26 readHolds.remove();
27 }
28 }
29 if (rh.count == 0)
30 return -1;
31 }
32 }
33 if (sharedCount(c) == MAX_COUNT)
34 throw new Error("Maximum lock count exceeded");
35 if (compareAndSetState(c, c + SHARED_UNIT)) {
36 if (sharedCount(c) == 0) {
37 firstReader = current;
38 firstReaderHoldCount = 1;
39 } else if (firstReader == current) {
40 firstReaderHoldCount++;
41 } else {
42 if (rh == null)
43 rh = cachedHoldCounter;
44 if (rh == null || rh.tid != current.getId())
45 rh = readHolds.get();
46 else if (rh.count == 0)
47 readHolds.set(rh);
48 rh.count++;
49 cachedHoldCounter = rh; // cache for release
50 }
51 return 1;
52 }
53 }
54 }
注释也说了,我们也看到了,整个逻辑几乎跟tryAcquireShared一模一样,只不过加了无限循环保证CAS成功。 值得注意的是,唯一不同之处在于16行的else,在有别的线程等待获取锁的状态下(且它们的优先级更高),如果当前所有readers都释放锁的话(cachedHold .count == 0) 那么当前线程也不会再去抢夺这个锁了,返回-1;如果当前还有readers占有锁,那么本线程也不客气,还是会去尝试CAS,毕竟现在是readers的天下
好了,如果当前线程没有成功拿到锁,那么回到acquireShared中去做doAcquireShared:
1 /**
2 * Acquires in shared uninterruptible mode.
3 * @param arg the acquire argument
4 */
5 private void doAcquireShared(int arg) {
6 final Node node = addWaiter(Node.SHARED);
7 boolean failed = true;
8 try {
9 boolean interrupted = false;
10 for (;;) {
11 final Node p = node.predecessor();
12 if (p == head) {
13 int r = tryAcquireShared(arg);
14 if (r >= 0) {
15 setHeadAndPropagate(node, r);
16 p.next = null; // help GC
17 if (interrupted)
18 selfInterrupt();
19 failed = false;
20 return;
21 }
22 }
23 if (shouldParkAfterFailedAcquire(p, node) &&
24 parkAndCheckInterrupt())
25 interrupted = true;
26 }
27 } finally {
28 if (failed)
29 cancelAcquire(node);
30 }
31 }
有没有很熟悉?跟ReentrantLock的acquireQueued如出一辙,不同的是这里调用了setHeadAndPropagate,就是要把后面连续的readers都唤醒,因为这里已经是readers的天下啦 不过要注意的是,这里addWaiter中,会把当前node标记为share状态,那么在之前的非公平模式的readerShouldBlock中检查到等待的是一个reader,那就会返回false,代表它不会考虑别的在等待的readers的感受,而公平模式中会检查这个标记,代表别的等待状态的readers有机会使当前获得锁的readers进入block状态
上锁讲完了,接下来看看readers是怎么unlock的:
1 // ReadLock
2 /**
3 * Attempts to release this lock.
4 *
5 * <p> If the number of readers is now zero then the lock
6 * is made available for write lock attempts.
7 */
8 public void unlock() {
9 sync.releaseShared(1);
10 }
11
12 // AbstractQueuedSynchronizer
13 /**
14 * Releases in shared mode. Implemented by unblocking one or more
15 * threads if {@link #tryReleaseShared} returns true.
16 *
17 * @param arg the release argument. This value is conveyed to
18 * {@link #tryReleaseShared} but is otherwise uninterpreted
19 * and can represent anything you like.
20 * @return the value returned from {@link #tryReleaseShared}
21 */
22 public final boolean releaseShared(int arg) {
23 if (tryReleaseShared(arg)) {
24 doReleaseShared();
25 return true;
26 }
27 return false;
28 }
还是先来看看Sync的tryRelease吧,这就没有公平不公平之分了:
1 protected final boolean tryReleaseShared(int unused) {
2 Thread current = Thread.currentThread();
3 if (firstReader == current) {
4 // assert firstReaderHoldCount > 0;
5 if (firstReaderHoldCount == 1)
6 firstReader = null;
7 else
8 firstReaderHoldCount--;
9 } else {
10 HoldCounter rh = cachedHoldCounter;
11 if (rh == null || rh.tid != current.getId())
12 rh = readHolds.get();
13 int count = rh.count;
14 if (count <= 1) {
15 readHolds.remove();
16 if (count <= 0)
17 throw unmatchedUnlockException();
18 }
19 --rh.count;
20 }
21 for (;;) {
22 int c = getState();
23 int nextc = c - SHARED_UNIT;
24 if (compareAndSetState(c, nextc))
25 // Releasing the read lock has no effect on readers,
26 // but it may allow waiting writers to proceed if
27 // both read and write locks are now free.
28 return nextc == 0;
29 }
30 }
还是CAS操作,很简单,不过要注意的是cachedHoldCounter,这就是为什么之前的tryAcquireShared里面要用到它的原因了,它能够反映最近一次release的情况 当全部readers(包括自己的重入)都release完之后,进入AbstractQueuedSynchronizer的doRelease:
1 /**
2 * Release action for shared mode -- signal successor and ensure
3 * propagation. (Note: For exclusive mode, release just amounts
4 * to calling unparkSuccessor of head if it needs signal.)
5 */
6 private void doReleaseShared() {
7 /*
8 * Ensure that a release propagates, even if there are other
9 * in-progress acquires/releases. This proceeds in the usual
10 * way of trying to unparkSuccessor of head if it needs
11 * signal. But if it does not, status is set to PROPAGATE to
12 * ensure that upon release, propagation continues.
13 * Additionally, we must loop in case a new node is added
14 * while we are doing this. Also, unlike other uses of
15 * unparkSuccessor, we need to know if CAS to reset status
16 * fails, if so rechecking.
17 */
18 for (;;) {
19 Node h = head;
20 if (h != null && h != tail) {
21 int ws = h.waitStatus;
22 if (ws == Node.SIGNAL) {
23 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
24 continue; // loop to recheck cases
25 unparkSuccessor(h);
26 }
27 else if (ws == 0 &&
28 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
29 continue; // loop on failed CAS
30 }
31 if (h == head) // loop if head changed
32 break;
33 }
34 }
如果后继节点需要被唤醒那就唤醒咯,如果不需要,就把它标记为PROPAGATE来告诉后面的节点所有readers都unlock了 不过有人要问了,为什么要用一个for循环还要用CAS呢?难道有人在跟它做同样的事?没错,就是之前doAcquireShared里面的setHeadAndPropagate:
1 /**
2 * Sets head of queue, and checks if successor may be waiting
3 * in shared mode, if so propagating if either propagate > 0 or
4 * PROPAGATE status was set.
5 *
6 * @param node the node
7 * @param propagate the return value from a tryAcquireShared
8 */
9 private void setHeadAndPropagate(Node node, int propagate) {
10 Node h = head; // Record old head for check below
11 setHead(node);
12 /*
13 * Try to signal next queued node if:
14 * Propagation was indicated by caller,
15 * or was recorded (as h.waitStatus) by a previous operation
16 * (note: this uses sign-check of waitStatus because
17 * PROPAGATE status may transition to SIGNAL.)
18 * and
19 * The next node is waiting in shared mode,
20 * or we don't know, because it appears null
21 *
22 * The conservatism in both of these checks may cause
23 * unnecessary wake-ups, but only when there are multiple
24 * racing acquires/releases, so most need signals now or soon
25 * anyway.
26 */
27 if (propagate > 0 || h == null || h.waitStatus < 0) {
28 Node s = node.next;
29 if (s == null || s.isShared())
30 doReleaseShared();
31 }
32 }
啊哈,这就是为什么readers等待group能够连续获得锁的原因了,而且连续的unparkSuccessor也会导致多个线程同时进行doReleaseShared操作,reader线程被唤醒后head也会随着改变,这就是为什么需要for循环和CAS保证原子性还有用PROPAGATE标记head已经将successor唤醒的原因了
光是一个ReadLock就已经那么累了,下面进入WriteLock
我们可以从ReadLock中看出一些蛛丝马迹,writers并不是共享的持有锁,一次只能有一个writer进行写操作。那么,WriteLock是不是会和我们之前认识的ReetrantLock(独占锁)大同小异呢?我们来探一探究竟
照例,还是从WriteLock的lock开始,它会调用Sync也就是AbstractQueuedSynchronizer里的acquire:
1 /**
2 * Acquires in exclusive mode, ignoring interrupts. Implemented
3 * by invoking at least once {@link #tryAcquire},
4 * returning on success. Otherwise the thread is queued, possibly
5 * repeatedly blocking and unblocking, invoking {@link
6 * #tryAcquire} until success. This method can be used
7 * to implement method {@link Lock#lock}.
8 *
9 * @param arg the acquire argument. This value is conveyed to
10 * {@link #tryAcquire} but is otherwise uninterpreted and
11 * can represent anything you like.
12 */
13 public final void acquire(int arg) {
14 if (!tryAcquire(arg) &&
15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16 selfInterrupt();
17 }
看到没有,这个之前ReentrantLock中调用的是同一个acquire方法,说明它是以独占的方式去获得锁的,不过具体的tryAcquire有一点点不同:
1 protected final boolean tryAcquire(int acquires) {
2 /*
3 * Walkthrough:
4 * 1. If read count nonzero or write count nonzero
5 * and owner is a different thread, fail.
6 * 2. If count would saturate, fail. (This can only
7 * happen if count is already nonzero.)
8 * 3. Otherwise, this thread is eligible for lock if
9 * it is either a reentrant acquire or
10 * queue policy allows it. If so, update state
11 * and set owner.
12 */
13 Thread current = Thread.currentThread();
14 int c = getState();
15 int w = exclusiveCount(c);
16 if (c != 0) {
17 // (Note: if c != 0 and w == 0 then shared count != 0)
18 if (w == 0 || current != getExclusiveOwnerThread())
19 return false;
20 if (w + exclusiveCount(acquires) > MAX_COUNT)
21 throw new Error("Maximum lock count exceeded");
22 // Reentrant acquire
23 setState(c + acquires);
24 return true;
25 }
26 if (writerShouldBlock() ||
27 !compareAndSetState(c, c + acquires))
28 return false;
29 setExclusiveOwnerThread(current);
30 return true;
31 }
过程还是比较清晰的,首先检查是否是readers占有锁,或者是已经有别的writer占有锁,如果上述情况成立,就会乖乖返回false去CLH里排队(没错,它不在这里插队) 如果是自己已经拿到了锁,那就会重入,返回true 如果当前没有人获得锁,就会进入writerShouldBlock()看看允不允许自己抢到这个锁,允许的话就CAS咯 我们来看看非公平模式的writerShouldBlock:
1 final boolean writerShouldBlock() {
2 return false; // writers can always barge
3 }
我勒个去,这就是非公平的体现,直接返回false,那代表着无论CLH里有什么,当前线程都会尝试去CAS获得锁 来看看公平模式是怎么做的:
1 final boolean writerShouldBlock() {
2 return hasQueuedPredecessors();
3 }
你看,它就公平多了,hasQueuedPredecessors就是看有没有已经有node在排队了,如果有的话,tryAcquire直接返回false了,没有才会尝试CAS 是不是跟ReentrantLock的招式师出同门的感觉呢。 再来看看unlock咯,最终也会调用AbstractQueuedSynchronizer的release,最后会调用Sync的tryRelease:
1 protected final boolean tryRelease(int releases) {
2 if (!isHeldExclusively())
3 throw new IllegalMonitorStateException();
4 int nextc = getState() - releases;
5 boolean free = exclusiveCount(nextc) == 0;
6 if (free)
7 setExclusiveOwnerThread(null);
8 setState(nextc);
9 return free;
10 }
我们会发现除了用exclusiveCount来计算writer的重入次数和ReentrantLock不同外,其他就是一毛一样
至此,我们论证了我们的结论,WriteLock就是一个独占模式,与ReetrantLock的逻辑几乎是一样的
总结
纵观全文,无论是共享锁(ReadLock),还是独占锁(WriteLock), 它们都是通过一个状态用CAS来决定到底是不是拿到这个锁的,并且共享所和独占锁之间也是互斥的。我们也分析了公平模式和非公平模式的区别,一句话概括就是前者是FIFO而后者会优先考虑writer不会让他们等太久。等待队列还是有AbstractQueuedSynchronizer来维护。 至此,我们可以发现,锁这个东西,终究还是对一块内存的操作是否能成功以及这块内存的内容是否是由当前线程自己决定的,如果是,那当前线程就能获得所谓的”锁”,反之就是不能
最后问个问题,为什么ReetrantReadWriteLock中WriteLock可以降级为ReadLock但是反过来就不行呢?答案就在Sync中的tryAcquireShared和tryAcquire中~