Condition

Condition
在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,配合Condition的await()、signal() 实现等待/通知模式。

Condition的实现

Condition必须要配合锁一起使用,一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。获取一个Condition必须要通过Lock的newCondition()方法。该方法定义在接口Lock下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。Condition为一个接口,其下仅有一个实现类ConditionObject,由于Condition的操作需要获取相关的锁,而AQS则是同步锁的实现基础,所以ConditionObject则定义为AQS的内部类。

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
...
}
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
...
}
...
}
等待队列

每个Condition对象都包含着一个FIFO队列,在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。

当前线程调用await()方法,将会以当前线程构成一个节点(Node),并将节点加入到该队列的尾部。

Node里面包含了当前线程的引用。Node定义与AQS的CLH同步队列的Node使用的都是同一个类(AbstractQueuedSynchronized.Node静态内部类)。

等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void await() throws InterruptedException {
// 当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
* 直到检测到此节点在同步队列上
*/
while (!isOnSyncQueue(node)) {
//线程挂起
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}Semaphore默认选择非公平锁。
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

调用Condition的await()方法会使当前线程进入等待状态,同时会调用addConditionWaiter()加入到Condition等待队列同时调用fullyRelease(Node node)释放该线程持有的锁。当从await()方法返回时,当前线程一定是获取了Condition相关连的锁。

isOnSyncQueue(Node node):如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true。

unlinkCancelledWaiters():负责将条件队列中状态不为Condition的节点删除。

通知
1
2
3
4
5
6
7
8
9
public final void signal() {
//检测是否是当前线程获得了锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}

调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。

1
2
3
4
5
6
//Sync实现,检测是否是当前线程获得了锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

doSignal(Node first)主要是做两件事:1.修改头节点,2.调用transferForSignal(Node first) 方法将节点移动到CLH同步队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void doSignal(Node first) {
do {
//唤醒节点,修改头结点,完成旧头结点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//将节点移动到CLH同步队列中
final boolean transferForSignal(Node node) {
//将该节点从状态CONDITION改变为初始状态0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

整个通知过程:

  1. 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
  2. 如果线程已经获取了锁,则将唤醒条件队列的首节点。
  3. 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中。
  4. 最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。

结论

一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

Condition实现的生产消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class ConditionTest {
private LinkedList<String> buffer; //容器
private Lock lock;
private Condition fullCondition;
private Condition notFullCondition;

ConditionTest(){
buffer = new LinkedList<String>();
lock = new ReentrantLock();
fullCondition = lock.newCondition();
notFullCondition = lock.newCondition();
}

public void set(String string) throws InterruptedException {
lock.lock(); //获取锁
try {
while (buffer.size() != 0){
notFullCondition.await(); //满了,添加的线程进入等待状态
}
System.out.println("生产了" + string);
buffer.add(string);
fullCondition.signal();
} finally {
lock.unlock(); //记得释放锁
}
}

public String get() throws InterruptedException {
String string;
lock.lock();
try {
while (buffer.size() == 0){
fullCoSemaphore默认选择非公平锁。ndition.await();
}
string = buffer.poll();
System.out.println("消费了" + string);
notFullCondition.signal();
} finally {
lock.unlock();
}
return string;
}

public static void main(String[] args) {
ConditionTest conditionTest =new ConditionTest();
new Thread(()->{
for(int i = 0; i < 5; i++){
try {
conditionTest.set("消息");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
for(int i = 0; i < 5; i++){
try {
conditionTest.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
/usr/local/java/bin/java ...
生产了消息
消费了消息
生产了消息
消费了消息
生产了消息
消费了消息
生产了消息
消费了消息
生产了消息
消费了消息

进程已结束,退出代码0

Condition
http://www.muzili.ren/2022/06/11/Condition/
作者
jievhaha
发布于
2022年6月11日
许可协议