CountDownLatch、Semaphore和CyclicBarrier
# 前言
日常工作中,经常会碰到这样的场景:有时候数据量特别大,任务量特别多,我们通常会开启多线程去分批执行任务,在所有任务执行完了之后,再去执行接下来的作业。
这时候,会想到 Java 并发包提供的基础工具类, 有 CountDownLatch、Semaphore 和 CyclicBarrier ,它们都是提供多线程环境的协调功能,但是具体有什么区别呢?
# CountDownLatch
CountDownLatch是在 java1.5 被引入的,CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成各自的工作后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
# CountDownLatch的使用
//构造器中的计数值(count)实际上就是闭锁需要等待的线程数量,这个值只能被设置一次
public void CountDownLatch(int count) {...}
// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
countDown();
2
3
4
5
6
7
8
9
10
11
12
主线程在启动其他线程后立即调用 **CountDownLatch.await()**方法,这样主线程就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他线程必须引用闭锁对象,他们已经完成了各自的任务,需要通知CountDownLatch对象,这种通知机制是通过 **CountDownLatch.countDown()**方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。
# CountDownLatch应用场景
场景1 让多个线程等待:模拟并发,让并发线程一起执行
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//运动员都阻塞在这,等待号令
countDownLatch.await();
System.out.println("【" + Thread.currentThread().getName() + "】" + "开始执行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 裁判准备发令
Thread.sleep(2000);
countDownLatch.countDown();
System.out.println("发令枪响");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
发令枪响 【Thread-2】开始执行…… 【Thread-0】开始执行…… 【Thread-3】开始执行…… 【Thread-4】开始执行…… 【Thread-1】开始执行……
场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并
public static void main(String[] args) throws InterruptedException {
//用于聚合所有的统计指标
Map map=new HashMap();
//创建计数器,这里需要统计4个指标
CountDownLatch countDownLatch=new CountDownLatch(4);
//记录开始时间
long startTime=System.currentTimeMillis();
Thread countUserThread=new Thread(() -> {
try {
System.out.println("统计用户数量");
Thread.sleep(1000);
map.put("userNumber",1);//保存结果值
countDownLatch.countDown();//标记已经完成一个任务
System.out.println("统计新增用户数量完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread countOrderThread=new Thread(() -> {
try {
System.out.println("统计订单数量");
Thread.sleep(2000);
map.put("countOrder",2);//保存结果值
countDownLatch.countDown();//标记已经完成一个任务
System.out.println("统计订单数量完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread countGoodsThread=new Thread(() -> {
try {
System.out.println("统计商品销量");
Thread.sleep(3000);//任务执行需要3秒
map.put("countGoods",3);//保存结果值
countDownLatch.countDown();//标记已经完成一个任务
System.out.println("统计商品销量完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread countmoneyThread=new Thread(() -> {
try {
System.out.println("统计总销售额");
Thread.sleep(4000);//任务执行需要3秒
map.put("countmoney",4);//保存结果值
countDownLatch.countDown();//标记已经完成一个任务
System.out.println("统计销售额完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//启动子线程执行任务
countUserThread.start();
countGoodsThread.start();
countOrderThread.start();
countmoneyThread.start();
try {
//主线程等待所有统计指标执行完毕
countDownLatch.await();
long endTime=System.currentTimeMillis();//记录结束时间
System.out.println("------统计指标全部完成--------");
System.out.println("统计结果为:"+map.toString());
System.out.println("任务总执行时间为"+(endTime-startTime)/1000+"秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
执行结果:
统计用户数量 统计商品销量 统计总销售额 统计订单数量 统计新增用户数量完毕 统计订单数量完毕 统计商品销量完毕 统计销售额完毕 ------统计指标全部完成-------- 统计结果为:{countmoney=4, countOrder=2, userNumber=1, countGoods=3} 任务总执行时间为4秒
CountDownLatch工作原理
CountDownLatch 底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
# CyclicBarrier
CyclicBarrier
字面意思就是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
# CyclicBarrier的使用
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties)
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
2
3
4
5
6
7
8
一般使用流程:
// 初始化值为5的栅栏
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// 每个线程调用 await()
cyclicBarrier.await();
// 等到有 5 个线程都执行了 await() 之后,继续执行。
// 并且 栅栏的 计数器会自动重置为 5 ,可以接着用
2
3
4
5
6
# CyclicBarrier应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
//保存每个学生的成绩
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
private CyclicBarrier cb = new CyclicBarrier(3, () -> {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
});
public void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
//获取学生成绩
int score = (int) (Math.random() * 40 + 60);
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName()
+ "同学的成绩为:" + score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
public static void main(String[] args) {
CyclicBarrierTest cb = new CyclicBarrierTest();
cb.count();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
pool-1-thread-1同学的成绩为:67 pool-1-thread-3同学的成绩为:69 pool-1-thread-2同学的成绩为:89 三人平均成绩为:75分
复用的场景
当之前的所有线程都达到栅栏之后,初始化的栅栏仍然能够在下一次线程操作复用,而不像CountDownLatch
一样减为0之后就无法再使用了。
public class CyclicBarrierTest1 {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println("读取" + Thread.currentThread().getName());
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("后续操作" + Thread.currentThread().getName());
}).start();
}
Thread.sleep(5000);
System.out.println("CyclicBarrier复用");
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println("读取" + Thread.currentThread().getName());
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("后续操作" + Thread.currentThread().getName());
}).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
读取Thread-0 读取Thread-4 读取Thread-3 读取Thread-2 读取Thread-1 后续操作Thread-1 后续操作Thread-4 后续操作Thread-2 后续操作Thread-0 后续操作Thread-3 CyclicBarrier复用 读取Thread-5 读取Thread-6 读取Thread-7 读取Thread-8 读取Thread-9 后续操作Thread-9 后续操作Thread-5 后续操作Thread-8 后续操作Thread-7 后续操作Thread-6
# CyclicBarrier与CountDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
- CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
- CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
- CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
- CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
- CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现
# 总结
CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行; CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。 对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。