基本概念

Semaphore也是一个线程同步的辅助类,在多线程环境下用于协调各个线程, 以保证它们能够正确、合理的使用公共资源。信号量维护了一个许可集,我们在初始化Semaphore时需要为这个许可集传入一个数量值,该数量值代表同一时间能访问共享资源的线程数量。

使用场景

这个应用就比较广泛了,主要用于流量控制,例如限制某接口或者静态资源的最大并发访问数,上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Test {

private static Semaphore semaphore = new Semaphore(2);

public static void main(String[] args) {

for(int i =1; i <= 10; i++){

Thread thread = new Thread(new Runnable() {
@Override
public void run() {
select();
}
});

thread.setName("线程序号" + i);
thread.start();
}
}

private static Object select(){

try {

// 注册
semaphore.acquire();

// 模拟查询耗时
Thread.currentThread().sleep(500L);

// 打印信息
StringBuilder info = new StringBuilder();
info.append(Thread.currentThread().getName());
info.append("进入查询方法,");
info.append("空闲通道:").append(semaphore.drainPermits());
info.append(",");
info.append("等待线程数:").append(semaphore.getQueueLength());
System.out.println(info.toString());
} catch (Exception e) {
e.printStackTrace();
}finally {
// 释放
semaphore.release();
}

return new Object();
}
}

打印结果:
线程序号1进入查询方法,空闲通道:0,等待线程数:8
线程序号2进入查询方法,空闲通道:0,等待线程数:8
线程序号3进入查询方法,空闲通道:0,等待线程数:6
线程序号4进入查询方法,空闲通道:0,等待线程数:6
线程序号5进入查询方法,空闲通道:0,等待线程数:4
线程序号6进入查询方法,空闲通道:1,等待线程数:4
线程序号7进入查询方法,空闲通道:0,等待线程数:3
线程序号8进入查询方法,空闲通道:0,等待线程数:2
线程序号9进入查询方法,空闲通道:0,等待线程数:1
线程序号10进入查询方法,空闲通道:0,等待线程数:0

打印结果可以看出来,同一时刻只能有2个线程对方法进行访问。

构造器源码

1
2
3
4
5
6
7
8
9
10

// 默认使用非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

// 可以通过构造器参数指定是否公平竞争
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore对象可以通过构造器指定访问限制,还可以指定争夺的公平方式。

Sync源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

// 初始化访问
Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}

// 非公平方式获取AQS共享式资源
final int nonfairTryAcquireShared(int acquires) {
// 自旋
for (;;) {
// 获取state值
int available = getState();
// 计算获取资源后值应该是多少
int remaining = available - acquires;
// 如果大于等于0说明满足条件,将计算后值通过CAS修改后返回,如果小于0直接返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 释放锁,就是把state加回来
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取state值
int current = getState();
// 计算加后的值
int next = current + releases;
// 如果加后值小于当前state值,说明参数为负数,抛异常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 使用CAS方式修改值
if (compareAndSetState(current, next))
return true;
}
}

// state减操作
final void reducePermits(int reductions) {
// 自旋
for (;;) {
// 获取state值
int current = getState();
// 计算减后的值
int next = current - reductions;
// 如果减后值大于当前state值,说明参数为负数,抛异常
if (next > current) // underflow
throw new Error("Permit count underflow");
// 使用CAS方式修改值
if (compareAndSetState(current, next))
return;
}
}

// 将state归零
final int drainPermits() {

// 自旋
for (;;) {

// 获取state
int current = getState();

// 如果是0直接返回0,不是0使用CAS设置成0在返回0,这是要干啥?
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

Sync类实现了很多方法:

  • nonfairTryAcquireShared():非公平性获取共享式锁,不进行排队直接自旋获取。
  • tryReleaseShared():释放共享式锁,使用CAS方式对state执行加操作。
  • reducePermits():使用CAS方式对state执行减操作。
  • drainPermits():将state设置为0并返回,不知道想干啥?

公平/非公平Sync源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

// 非公平
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

// 调用父类写好的方法,非公平式获取锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

// 公平
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

// 无限自旋直到CAS修改成功
protected int tryAcquireShared(int acquires) {
for (;;) {

// 比非公平锁多了一个步骤,判断前面是否有人,如果前面有人就放弃
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

公平与非公平Sync逻辑几乎一样,只是公平锁在尝试获取资源的时候会先去判断前面是否已经有人,如果有人就放弃尝试,进入AQS的等待阻塞方法。而非公平锁不管前面有没有人都会尝试获取直到成功。

acquire()源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {

// 先判断是否已经被中断
if (Thread.interrupted())
throw new InterruptedException();

// 如果访问次数已经耗尽,进入doAcquireSharedInterruptibly()方法阻塞
if (tryAcquireShared(arg) < 0)

/*
* 源码就不贴了,AQS写好的方法:
* 排在等待队列的第一个,自旋等待直到重写方法tryAcquireShared(arg)返回值大于0跳出自旋
* 排在等待队列的第二个开始,直接挂起一边呆着去...
*/
doAcquireSharedInterruptibly(arg);
}

总结

这没啥好说的,只不过现在都是分布式项目,如果限流的目的是减少数据库或静态资源的访问,单靠Semaphore无法实现,还需要依靠基于Redis或Zookeeper的分布式锁实现,感觉用处不多。

评论