Semaphore
Semaphore,字面意思信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。和CountDownLatch一样,其本质上是一个“共享锁”。 |
内部实现
Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
访问资源后,使用release释放许可。
同样的Semaphore也是依赖内部类Sync实现,而Sync继承AQS。Semaphore内部包含公平锁(FairSync)和非公平锁(NonfairSync),和ReentrantLock类似。
Semaphore提供了两个构造函数,Semaphore默认选择非公平锁。:
Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore。
Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的 Semaphore。
1 2 3 4 5 6 7
| public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
|
当信号量Semaphore = 1 时,它可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。
信号量获取
acquire()方法来获取一个许可。结构和ReentrantLock一致
1 2 3
| public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
|
await内部调用AQS的acquireSharedInterruptibly
1 2 3 4 5 6 7
| public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
|
tryAcquireShared由Semaphore内部类Sync重写实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
|
信号量释放
获取了许可,当用完之后使用release()来释放许可。和CountDownLatch类似
1 2 3
| public void release() { sync.releaseShared(1); }
|
release内部调用AQS的releaseShared
1 2 3 4 5 6 7
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
tryReleaseShared方法由Semaphore的内部类Sync重写实现:
1 2 3 4 5 6 7 8 9 10 11 12
| protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
|
应用场景
多个共享资源互斥的使用
并发限流,控制最大线程数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class App { public static void main(String[] args){ Semaphore semaphore = new Semaphore(1); ExecutorService pools = Executors.newCachedThreadPool(); for(int i = 0; i < 2; i++){ Runnable runnable = () -> { try{ semaphore.acquire(); System.out.println("任务" + Thread.currentThread().getName() + "开始执行"); TimeUnit.SECONDS.sleep(1); }catch (InterruptedException e){
}finally { semaphore.release(); System.out.println("任务" + Thread.currentThread().getName() + "执行完毕"); } }; pools.execute(runnable); } pools.shutdown(); } }
|
运行结果如下: