程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

    • 并发和并行
    • 什么是多线程
    • Java 并发容器有哪些?
    • Java的Future机制详解
    • 什么是AQS
    • 一文搞懂 ThreadLocal
    • java 阻塞队列 详细介绍
    • java线程池使用最全详解
    • 面试官问我什么是JMM
    • CountDownLatch、Semaphore和CyclicBarrier
      • 前言
      • CountDownLatch
        • CountDownLatch的使用
        • CountDownLatch应用场景
      • CyclicBarrier
        • CyclicBarrier的使用
        • CyclicBarrier应用场景
      • CyclicBarrier与CountDownLatch的区别
        • 总结
    • Java:线程的六种状态及转化
  • Mybatis

  • 网络编程

  • 数据库

  • 缓存

  • 设计模式

  • 分布式

  • 高并发

  • SpringBoot

  • SpringCloudAlibaba

  • Nginx

  • 面试

  • 生产问题

  • 系统设计

  • 消息中间件

  • Java
  • 并发编程
程序员子龙
2024-01-31
目录

CountDownLatch、Semaphore和CyclicBarrier

# 前言

日常工作中,经常会碰到这样的场景:有时候数据量特别大,任务量特别多,我们通常会开启多线程去分批执行任务,在所有任务执行完了之后,再去执行接下来的作业。

这时候,会想到 Java 并发包提供的基础工具类, 有 CountDownLatch、Semaphore 和 CyclicBarrier ,它们都是提供多线程环境的协调功能,但是具体有什么区别呢?

# CountDownLatch

CountDownLatch是在 java1.5 被引入的,CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成各自的工作后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

# CountDownLatch的使用


//构造器中的计数值(count)实际上就是闭锁需要等待的线程数量,这个值只能被设置一次
public void CountDownLatch(int count) {...}

 // 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };  

// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  

//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
countDown();
1
2
3
4
5
6
7
8
9
10
11
12

主线程在启动其他线程后立即调用 **CountDownLatch.await()**方法,这样主线程就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他线程必须引用闭锁对象,他们已经完成了各自的任务,需要通知CountDownLatch对象,这种通知机制是通过 **CountDownLatch.countDown()**方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

# CountDownLatch应用场景

场景1 让多个线程等待:模拟并发,让并发线程一起执行

    CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    //运动员都阻塞在这,等待号令
                    countDownLatch.await();
                    System.out.println("【" + Thread.currentThread().getName() + "】" + "开始执行……");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        // 裁判准备发令
        Thread.sleep(2000);
        countDownLatch.countDown();
        System.out.println("发令枪响");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

发令枪响 【Thread-2】开始执行…… 【Thread-0】开始执行…… 【Thread-3】开始执行…… 【Thread-4】开始执行…… 【Thread-1】开始执行……

场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并

    public static void main(String[] args) throws InterruptedException {
        //用于聚合所有的统计指标
       Map map=new HashMap();
        //创建计数器,这里需要统计4个指标
       CountDownLatch countDownLatch=new CountDownLatch(4);

        //记录开始时间
        long startTime=System.currentTimeMillis();

        Thread countUserThread=new Thread(() -> {
            try {
                System.out.println("统计用户数量");
                Thread.sleep(1000);
                map.put("userNumber",1);//保存结果值
                countDownLatch.countDown();//标记已经完成一个任务
                System.out.println("统计新增用户数量完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        Thread countOrderThread=new Thread(() -> {
            try {
                System.out.println("统计订单数量");
                Thread.sleep(2000);
                map.put("countOrder",2);//保存结果值
                countDownLatch.countDown();//标记已经完成一个任务
                System.out.println("统计订单数量完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });

        Thread countGoodsThread=new Thread(() -> {
            try {
                System.out.println("统计商品销量");
                Thread.sleep(3000);//任务执行需要3秒
                map.put("countGoods",3);//保存结果值
                countDownLatch.countDown();//标记已经完成一个任务
                System.out.println("统计商品销量完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });

        Thread countmoneyThread=new Thread(() -> {
            try {
                System.out.println("统计总销售额");
                Thread.sleep(4000);//任务执行需要3秒
                map.put("countmoney",4);//保存结果值
                countDownLatch.countDown();//标记已经完成一个任务
                System.out.println("统计销售额完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        //启动子线程执行任务
        countUserThread.start();
        countGoodsThread.start();
        countOrderThread.start();
        countmoneyThread.start();

        try {
            //主线程等待所有统计指标执行完毕
            countDownLatch.await();
            long endTime=System.currentTimeMillis();//记录结束时间
            System.out.println("------统计指标全部完成--------");
            System.out.println("统计结果为:"+map.toString());
            System.out.println("任务总执行时间为"+(endTime-startTime)/1000+"秒");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

执行结果:

统计用户数量 统计商品销量 统计总销售额 统计订单数量 统计新增用户数量完毕 统计订单数量完毕 统计商品销量完毕 统计销售额完毕 ------统计指标全部完成-------- 统计结果为:{countmoney=4, countOrder=2, userNumber=1, countGoods=3} 任务总执行时间为4秒

CountDownLatch工作原理

CountDownLatch 底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark阻塞线程;这一步是由最后一个执行countdown方法的线程执行的。

而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

# CyclicBarrier

CyclicBarrier字面意思就是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

# CyclicBarrier的使用

// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
 public CyclicBarrier(int parties)
 
 
 //屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞
// BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
1
2
3
4
5
6
7
8

一般使用流程:

// 初始化值为5的栅栏
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
// 每个线程调用 await()
cyclicBarrier.await();
// 等到有 5 个线程都执行了 await() 之后,继续执行。
// 并且 栅栏的 计数器会自动重置为 5 ,可以接着用
1
2
3
4
5
6

# CyclicBarrier应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。


    //保存每个学生的成绩
    private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<String, Integer>();

    private ExecutorService threadPool = Executors.newFixedThreadPool(3);

    private CyclicBarrier cb = new CyclicBarrier(3, () -> {
        int result = 0;
        Set<String> set = map.keySet();
        for (String s : set) {
            result += map.get(s);
        }
        System.out.println("三人平均成绩为:" + (result / 3) + "分");
    });


    public void count() {
        for (int i = 0; i < 3; i++) {
            threadPool.execute(new Runnable() {

                @Override
                public void run() {
                    //获取学生成绩
                    int score = (int) (Math.random() * 40 + 60);
                    map.put(Thread.currentThread().getName(), score);
                    System.out.println(Thread.currentThread().getName()
                            + "同学的成绩为:" + score);
                    try {
                        //执行完运行await(),等待所有学生平均成绩都计算完毕
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }

            });
        }
    }

    public static void main(String[] args) {
        CyclicBarrierTest cb = new CyclicBarrierTest();
        cb.count();
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

pool-1-thread-1同学的成绩为:67 pool-1-thread-3同学的成绩为:69 pool-1-thread-2同学的成绩为:89 三人平均成绩为:75分

复用的场景

当之前的所有线程都达到栅栏之后,初始化的栅栏仍然能够在下一次线程操作复用,而不像CountDownLatch一样减为0之后就无法再使用了。

public class CyclicBarrierTest1 {

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    System.out.println("读取" + Thread.currentThread().getName());
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("后续操作" + Thread.currentThread().getName());
            }).start();
        }
        Thread.sleep(5000);
        System.out.println("CyclicBarrier复用");
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    System.out.println("读取" + Thread.currentThread().getName());
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("后续操作" + Thread.currentThread().getName());
            }).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

读取Thread-0 读取Thread-4 读取Thread-3 读取Thread-2 读取Thread-1 后续操作Thread-1 后续操作Thread-4 后续操作Thread-2 后续操作Thread-0 后续操作Thread-3 CyclicBarrier复用 读取Thread-5 读取Thread-6 读取Thread-7 读取Thread-8 读取Thread-9 后续操作Thread-9 后续操作Thread-5 后续操作Thread-8 后续操作Thread-7 后续操作Thread-6

# CyclicBarrier与CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次
  • CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、isBroken(用来知道阻塞的线程是否被中断)等方法。
  • CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
  • CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
  • CyclicBarrier是通过ReentrantLock的"独占锁"和Conditon来实现一组线程的阻塞唤醒的,而CountDownLatch则是通过AQS的“共享锁”实现

# 总结

CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行; CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行。 对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

上次更新: 2024/03/18, 15:55:32
面试官问我什么是JMM
Java:线程的六种状态及转化

← 面试官问我什么是JMM Java:线程的六种状态及转化→

最近更新
01
一个注解,优雅的实现接口幂等性
11-17
02
MySQL事务(超详细!!!)
10-14
03
阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
10-09
更多文章>
Theme by Vdoing | Copyright © 2024-2024

    辽ICP备2023001503号-2

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式