Semaphore

Semaphore
Semaphore,字面意思信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。和CountDownLatch一样,其本质上是一个“共享锁”。

内部实现


Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。

  • 访问资源后,使用release释放许可。

    同样的Semaphore也是依赖内部类Sync实现,而Sync继承AQS。Semaphore内部包含公平锁(FairSync)和非公平锁(NonfairSync),和ReentrantLock类似。

    Semaphore内部实现图

    Semaphore和ReentrantLock比较

    Semaphore提供了两个构造函数,Semaphore默认选择非公平锁。:

    1. Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore。

    2. Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的 Semaphore。

      1
      2
      3
      4
      5
      6
      7
      public Semaphore(int permits) {
      sync = new NonfairSync(permits);
      }

      public Semaphore(int permits, boolean fair) {
      sync = fair ? new FairSync(permits) : new NonfairSync(permits);
      }

当信号量Semaphore = 1 时,它可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。

信号量获取

acquire()方法来获取一个许可。结构和ReentrantLock一致

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

await内部调用AQS的acquireSharedInterruptibly

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared由Semaphore内部类Sync重写实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//公平
protected int tryAcquireShared(int acquires) {
for (;;) {
//判断该线程是否位于CLH队列的列头
if (hasQueuedPredecessors())
return -1;
//获取当前的信号量许可
int available = getState();

//设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires;

//CAS设置信号量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//非公平
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {//Sync实现
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
信号量释放

获取了许可,当用完之后使用release()来释放许可。和CountDownLatch类似

1
2
3
public void release() {
sync.releaseShared(1);
}

release内部调用AQS的releaseShared

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared方法由Semaphore的内部类Sync重写实现:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
//设置可获取的信号许可数为next
if (compareAndSetState(current, next))
return true;
}
}

应用场景


  1. 多个共享资源互斥的使用

  2. 并发限流,控制最大线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class App 
{
public static void main(String[] args){
Semaphore semaphore = new Semaphore(1);
ExecutorService pools = Executors.newCachedThreadPool();
for(int i = 0; i < 2; i++){
Runnable runnable = () -> {
try{
semaphore.acquire();
System.out.println("任务" + Thread.currentThread().getName() + "开始执行");
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){

}finally {
semaphore.release();
System.out.println("任务" + Thread.currentThread().getName() + "执行完毕");
}
};
pools.execute(runnable);
}
pools.shutdown();
}
}

运行结果如下:

Semaphore测试结果


Semaphore
http://www.muzili.ren/2022/06/11/Semaphore/
作者
jievhaha
发布于
2022年6月11日
许可协议