Java – 可重入锁ReentrantLock实现原理
注:不同版本的JDK,此部分源码存在差异,当下17版本与本文1.8版本差异较大
在实现层面除了依赖于CAS(compareAndSet)方法之外,同时依赖于类LockSupport中的一些方法。
一、LockSupport
类 LockSupport 位于包 java.util.concurrent.locks
,其基本方法有
public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline)
public static void unpark(Thread thread)
其中park
方法会使得当前线程放弃CPU,进入等待(WAITING)状态,操作系统不会再对其进行调度。直到其他线程对它调用了unpark
方法,其中unpark
方法使得参数指定的线程恢复可运行状态:
public class LockSupportTest {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread (){
public void run(){
LockSupport.park(); //放弃CPU
System.out.println("exit");
}
};
t.start(); //启动线程
Thread.sleep(1000); //睡眠1秒保证子线程先运行
System.out.println("after 1 second");
LockSupport.unpark(t);
}
}
以上代码中,首先主线程启动了一个子线程t,之后线程t调用了park进入阻塞态。主线程睡眠1秒保证子线程已经调用了 LockSupport.park();
方法。最后主线程调用unpark
方法,使得子线程t恢复运行,并且在控制台进行输出打印。
park 方法不同于 Thread.yield() 方法。 yield 只是告诉操作系统可以让其他线程先运行,但是自己可以仍是运行态。 而 park 方法则是放弃线程的运行资格,使得线程进入 WAITING 等待状态。
同时 park 方法是响应中断的,当有中断发生时,park方法会返回,并且重新设置线程的中断状态。
park 方法有两个变体
- parkNanos: 可以指定等待的最长时间,参数时相对于当前时间的纳秒数。
- parkUntil:可以指定最长等待的时间,参数时绝对时间,相对于纪元时的毫秒数。
当等待超时,方法就会返回。同时还有一些其他的变体,可以指定一个对象,表示是由于该对象而进行等待,以便于调试,一般情况下传递的参数为 this,比如:
public static void park(Object blocker)
LockSupport含有一个方法,可以返回一个线程的blocker对象:
/**
* Returns the blocker object supplied to the most recent
* invocation of a park method that has not yet unblocked, or null
* if not blocked. The value returned is just a momentary
* snapshot -- the thread may have since unblocked or blocked on a
* different blocker object.
*
* @param t the thread
* @return the blocker
* @throws NullPointerException if argument is null
* @since 1.6
*/
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}
二、AQS
在Java中除了可重入锁,还有其他的并发工具,比如 ReentrantReadWriteLock,Semaphore,CountDownLatch等等,他们的实现还有其他类似的地方,为了复用代码,Java提供了一个抽象的类AbstractQueuedSynchronizer
,简称AQS,其简化了并发工具的实现。
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
AQS 使用一个用volatile修饰的 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
AQS 定义两种资源共享方式:Exclusive(独占)、Share(共享)
AQS 底层使用了模板方法模式:对于需自定义的情景,采用继承+重写的方式。
这里只对AQS进行简单的介绍,在AQS中封装了一个状态,并且给子类提供了查询和设置状态的方法:
private volatile int state;
protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)
在用于锁的实现时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t)
protected final Thread getExclusiveOwnerThread()
AQS内部维护了一个等待队列,借助CAS方法实现无阻塞算法进行更新。
三、ReentrantLock
ReentrantLock内部使用AQS的时候,主要有以下的三个内部类。
abstract static class Sync extends AbstractQueuedSynchronizer
static final class NonfairSync extends Sync
static final class FairSync extends Sync
其中Sync是抽象类,NonfairSync是 fair 为 false 时使用的类,而 FairSync 是 fair 为 true 时需要使用的类。
在 ReentrantLock 内部有一个 Sysc 成员:
private final Sync sync;
该成员在构造方法中被初始化,例如:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
接下来查看 ReentrantLock 中基本方法 lock/unlock 的实现。
3.1 lock
因为 sync 默认是 NonfairSync,且非公平方式更为常用。其中 NonfairSync 的 lock 代码为:
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
ReentrantLock 中使用 state 用于表示是否被锁和持有的数量,如果当前未被锁定,则立即获得锁,否则调用acquire(1);
方法来获得锁,其中acquire
为AQS中的方法,其实现为:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这段代码调用 tryAcquire 方法来尝试获取锁,该方法需要被子类重写,在 NonFairSync 中的实现为:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
而 nonfairTryAcquire 方法是由抽象类 Sync 实现的:
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这段代码的意思是,如果没有被锁定,则使用CAS进行锁定;如果当前线程已经被锁定,则增加锁定次数(如果当前锁的状态不是0,就去比较当前线程和占用锁的线程是不是一个线程,如果是,会去增加状态变量的值,从这里看出可重入锁之所以可重入,就是同一个线程可以反复使用它占用的锁)。。
如果 tryArquire方法返回false,则acquire
方法会继续调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
。
其中,addWaiter 会新建一个节点 Node,代表当前线程,然后加入内部的等待队列中。
在当如等待队列之后,调用 acquireQueued 来尝试获取锁,其代码为:
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里的代码主体是一个死循环,在每次循环中,首先检查当前节点是否为第一个等待的节点,如果是且能获取到锁,就将当前节点从等待队列中移除冰洁返回,否则通过parkAndCheckInterrupt
方法最终调用 LockSupport.park
而放弃CPU,进入等待状态,在被唤醒之后检查是否发生了中断,记录中断标志。并且返回中断标志。
如果发生了中断,acquire 方法会调用 selfInterrupt
方法来设置中断标志位,其实现代码为:
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
由上可以得出 lock 方法的基本过程为:如果能获得锁则立即获得,如若不能则加入等待队列。被唤醒之后检查自己是否为第一个等待的线程,如果是且能获得锁则返回,否则继续等待。如果在该过程中发生了中断, lock 会记录中断标志位,但是不会提前返回或者抛除异常
3.2 unlock
ReentrantLock 的 unlock 方法代码为:
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
其中调用的release
方法为 AQS中定义的方法,其实现为:
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
进入release方法时,内部先尝试tryRelease操作,主要是去除锁的独占线程,然后将状态减一,这里减一主要是考虑到可重入锁可能自身会多次占用锁,只有当状态变成0,才表示完全释放了锁。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor 方法会调用 LockSupport.unpark 将第一个等待的线程唤醒。其实现为:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3.3 公平与非公平对比
FairSync 和 NonfairSync 中的主要区别为: 在获取锁时,在tryAcquire 方法中,如果当前线程状态没有被锁定,也即c == 0
,FairSysc 会多一个检查,实现如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
}
这个检查的目的是,当没有其他等待时间更长的线程时,才能获取到锁。
为什么不默认保证公平? 保证公平整体性能会比较低,其原因不是因为检查慢,而是因为会让活跃线程无法得到锁,从而进入等待状态,引起了频繁的上下文切换,降低了整体的效率。
通常情况下,线程运行的顺序影响不大。当程序长期运行时,在统计学的搅动,不公平的线程处理方式,基本上是公平的,可以使得活跃线程持续运行。
需要注意的是,即使构造是的参数 fair 为 true, ReentrantLock 中不带参数的 tryLock 方法也是不保证的公平的,它并不会检测是否有其他等待时间更长的线程。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
这从方法调用中,可以很明显的看到。
四、ReentrantLock和synchronized对比
相比 synchronized , ReentrantLock可以实现与 synchronized 相同的语义,而且支持以非阻塞方式获取锁,也可以想用中断,限时阻塞,更为灵活。 但是 synchronized 的使用更为简单,代码量也更少。
synchronized 代表的是一种声明式编程思维, 程序员表达的更多是一种同步声明。 由 Java 系统负责实现,程序员并不清楚实现细节; 而显式锁代表一种命令式编程思维,使用者需要实现所有的细节。
声明式编程的好处除了简单,在性能上也有所体现。 在较新版本的 JVM 上,ReentrantLock和synchronized的性能是接近的, 并且 Java 编译器和虚拟机会不断优化 synchronized 的实现,比如自动分析 synchronized 的使用,对于没有锁竞争的场景,自动忽略对获取锁/释放锁的调用。