重入锁ReentrantLock

重入锁ReentrantLock
ReentrantLock,可重入锁,排他锁,可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更强大、灵活的锁机制,可以减少死锁发生的概率。

实现

ReentrantLockUML图

FairSyncNonfairSyncSync是ReentrantLock的内部类,FairSync和NonfairSync继承自Sync,Sync继承自AbstractQueuedSynchronizer。

1
static final class NonfairSync extends Sync {...}
1
static final class FairSync extends Sync {...}
1
abstract static class Sync extends AbstractQueuedSynchronizer {...}

ReentrantLock里的大部分功能都是委托给Sync实现的,FairSync和NonfairSync二者内部只有lock()和tryAcquire(),Sync提供了抽象方法Lock(),并且默认实现了nonfairTryAcquire(int acquires),AbstractQueuedSynchronizer提供了抽象方法tryAcquire()。

获取锁

1
2
3
4
5
//非公平锁  sync = new NonfairSync();
ReentrantLock lock = new ReentrantLock();
//公平锁 sync = fair ? new FairSync() : new NonfairSync();
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
1
2
3
public void lock() {
sync.lock();
}

非公平锁为例:

获取锁的大概流程

(AQS)getState == 0? 否分支—>可重入锁的具体表现

非公平锁加解锁

非公平锁NonfairSync获取锁lock()的时候

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))//尝试获取锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);//ReentrantLock是可重入锁,此处1是为了再次获取了锁进行statu累加
}

若锁已被占用,便会执行acquire(int arg)(该方法由AbstractQueuedSynchronizer实现了)。

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

各个方法定义如下:

  1. tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义组件实现(可以看到NonfairSync内部类已实现),该方法必须要保证线程安全的获取同步状态。

    tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。

  2. addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。(AQS简述里有描述)

  3. acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。acquireQueued方法可以对排队中的线程进行“获锁”操作。

  4. selfInterrupt:产生一个中断。

非公平锁NonfairSync的同步状态的获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//Sync内部实现的,典
//该方法主要逻辑:
//典型的可重入锁...................................
//首先判断同步状态state == 0 ?,如果是表示该锁还没有被线程持有,直接通过CAS获取同步状态,如果成功返回true。
//如果state != 0,则判断当前线程是否为获取锁的线程,如果是则获取锁,成功返回true。成功获取锁的线程再次获取锁,这是增加了同步状态state。
final boolean nonfairTryAcquire(int acquires) {
//当前线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//state == 0,表示没有该锁处于空闲状态
if (c == 0) {
//获取锁成功,设置为当前线程所有
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//线程重入
//判断锁持有的线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

setExclusiveOwnerThread(current)getExclusiveOwnerThread()类似于set,get。

1
2
3
4
5
6
7
//由AbstractOwnableSynchronizer提供
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}

acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。“何时出队列?”和“如何出队列?“,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
// 通过枚举值我们知道waitStatus>0是取消状态
if (ws > 0) {
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前任节点等待状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer
//LockSupport底层其实调用的是Unsafe,有一篇记录了Unsafe的主要功能
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

入队出队

从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):

shouldParkAfterFailedAcquire流程

acquire()执行流程如下:

ReentrantLock-acquire

释放锁

ReentrantLock提供了unlock释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void unlock() {
sync.release(1);
}
//AQS中定义实现的
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryAcquire一样,tryRelease也需要组件自定义实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//Sync中实现
protected final boolean tryRelease(int releases) {
//减掉releases
int c = getState() - releases;
//如果释放的不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state == 0 表示已经释放完全了,其他线程可以获取同步状态了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

公平锁和非公平锁

公平锁与非公平锁的区别在于获取锁tryAcquire的时候是否按照FIFO的顺序来。释放锁不存在公平性和非公平性

比较非公平锁和公平锁获取同步状态的过程,会发现两者唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。双端链表的头结点是一个无参构造函数的头结点。

当h != t时: 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True(这块具体见下边代码分析)。如果(s = h.next) != null,说明此时队列中至少有一个有效节点。如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

1
2
3
4
5
6
7
public final boolean hasQueuedPredecessors() {
Node t = tail; //尾节点
Node h = head; //头节点
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//上边是lock()先直接去获取锁,没有获得锁,然后入队执行addWaiter(),假如队内没有节点,执行enq(node);具体内容如下:
// java.util.concurrent.locks.AbstractQueuedSynchronizer #enq

if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}

节点入队不是原子操作,所以会出现短暂的head != tail,此时Tail指向最后一个节点,而且Tail指向Head。如果Head没有指向Tail(可见5、6、7行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。

ReentrantLock与synchronized的区别

博客看的比较专业的

  1. 与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
  2. ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合(以后会阐述Condition)。
  3. ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
  4. ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
  5. ReentrantLock支持中断处理,且性能较synchronized会好些。

自己看课程总结的和Lock的区别

  1. Synchronized内置的java关键字,Lock是一个java类。
  2. Synchronized无法判断获取锁的状态,Lock可以判断是否获取到锁。
  3. Synchronized可以自动释放锁,Lock必须手动释放锁(不释放,会造成死锁):手动挡和自动挡。
  4. Synchronized线程1(获得锁,阻塞),线程2(等待,傻傻的等),Lock锁不一定一直等待(tryLock())。
  5. Synchronized可重入锁,不可中断,非公平,Lock可重入锁,可判断锁状态,默认非公平(可设置)。
  6. Synchronized适合锁少量代码同步问题,Lock适合锁大量代码。

重入锁ReentrantLock
http://www.muzili.ren/2022/06/11/ReentrantLock/
作者
jievhaha
发布于
2022年6月11日
许可协议