博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CyclicBarrier使用及源码分析
阅读量:4148 次
发布时间:2019-05-25

本文共 5655 字,大约阅读时间需要 18 分钟。

简述:CyclicBarrier是提供为了让多个线程在达到某个点的情况下,再继续往下执行,举个例子,在计算excle表格中的数据时,每个线程进行每行数据的求和操作,在每个线程都执行结束之后,即到这个点上,再进行每行数据的求和操作,这就是CyclicBarrier要解决的问题。

先来看下javadoc中给出的示例代码

class Solver {    final int N;    final float[][] data;    final CyclicBarrier barrier;     class Worker implements Runnable {      int myRow;      Worker(int row) { myRow = row; }      public void run() {        while (!done()) {          processRow(myRow);  //处理每行数据           try {            barrier.await(); //处理结束后等待          } catch (InterruptedException ex) {            return;          } catch (BrokenBarrierException ex) {            return;          }        }      }    }     public Solver(float[][] matrix) {      data = matrix;      N = matrix.length;      Runnable barrierAction =        new Runnable() { public void run() { mergeRows(...); }};  //计算完每行操作之后,执行求和操作      barrier = new CyclicBarrier(N, barrierAction);  //创建一个barrier对象       List
threads = new ArrayList
(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); } }

上述代码是官网提供出来的一个使用场景,部分实例是伪代码

每个线程可以在barrier.await之前执行一部分计算,barrier.await会挂起当前线程,等到线程await的数量等于barrier设定的初始数量,之前被挂起的线程会被唤醒,执行线程当中barrier.await后面的代码逻辑

具体的示例代码

public class CyclicBarrierTest2 {    static CyclicBarrier barrier = new CyclicBarrier(2,new A());    public static void main(String[] args) {        new Thread(()->{            try{                barrier.await();            }catch (Exception e) {            }            System.out.println(1);        }).start();        try{            barrier.await();        }catch (Exception e){        }        System.out.println(2);    }}class A implements Runnable{    @Override    public void run() {        System.out.println(3);    }}

执行结果 3 1 2 或者 3 2 1

关于使用方式,及异常情况这篇日志给出了非常详细的示例,对于reset,等待超时,线程中断等情况下,其他线程受到的影响这篇文章都非常详细进行了分析

源码分析

关于CyclicBarrier他的实现方式是通过内部ReentrantLock来对资源进行加锁和释放锁操作,通过ReentrantLock中的condition来进行线程阻塞和线程的唤起,关于ReentrantLock不熟悉的话,可以参看我之前的日志

先来看下CylicBarrier属性

public class CyclicBarrier {   /** 提供一个锁 */    private final ReentrantLock lock = new ReentrantLock();    /** condition来决定什么时候进行挂起和唤醒 */    private final Condition trip = lock.newCondition();    /** 初始化设定的阻塞线程数 */    private final int parties;    /* 到达线程屏障后,需要执行的操作*/    private final Runnable barrierCommand;    /** 内部类,维护boolean变量判断屏障是否失效 */    private Generation generation = new Generation();// 阻塞线程数-已经挂起的线程数 private int count;}

接下来我们看下当调用cyclicBarrier的await方法的实现逻辑

public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }

调用dowait方法,是整个cyclicBarrier实现的核心代码

private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        final ReentrantLock lock = this.lock;        lock.lock(); //先对以下的操作进行加锁        try {            final Generation g = generation;            if (g.broken) // 如果已经broken 直接抛出异常                throw new BrokenBarrierException();            if (Thread.interrupted()) { //如果线程中断,generation中的broken设置为true, 并唤醒挂起的线程                breakBarrier(); //将count值重新设置成初始化的线程数                throw new InterruptedException(); //抛出中断异常            }            int index = --count; // count值减一,没调用一次await方法,count值减一,初始化时等于设定的线程数            if (index == 0) {  // tripped //如果所有的线程都已经执行了await方法                boolean ranAction = false;                try {                    final Runnable command = barrierCommand; //判断到达屏障处,是否有其他的操作,如果是的话,在当前线程中执行                    if (command != null)                        command.run();                    ranAction = true;                    nextGeneration(); // 唤醒所有挂起的线程,将count的值,重新更新为初始时的值,generation也创建一个新的                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }            // 如果当前所有线程还没有执行到await方法            for (;;) {                  try {                    if (!timed)                        trip.await();  //直接挂起线程,并在最后释放掉锁                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                    }                }                if (g.broken)                    throw new BrokenBarrierException();                if (g != generation)  //需要注意,在count的值减到0时,会重新创建一个generation, 这个时候g和之前不是同一个,return                    return index;                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            lock.unlock();        }    }
private void nextGeneration() {        // signal completion of last generation        trip.signalAll();        // set up next generation        count = parties;        generation = new Generation();    }

以上就是CyclicBarrier的使用和源码分析,相对来说CyclicBarrier的源码是比较简洁易懂的

CyclicBarrier和CountDownLatch的区别就是CyclicBarrier是可以重复使用的,而CountDownLatch只能使用一次

参考:

转载地址:http://govti.baihongyu.com/

你可能感兴趣的文章
实验4-1 逻辑量的编码和关系操作符
查看>>
实验5-2 for循环结构
查看>>
实验5-3 break语句和continue语句
查看>>
实验5-4 循环的嵌套
查看>>
实验5-5 循环的合并
查看>>
实验5-6 do-while循环结构
查看>>
实验5-7 程序调试入门
查看>>
实验5-8 综合练习
查看>>
第2章实验补充C语言中如何计算补码
查看>>
深入入门正则表达式(java) - 命名捕获
查看>>
使用bash解析xml
查看>>
android系统提供的常用命令行工具
查看>>
【Python基础1】变量和字符串定义
查看>>
【Python基础2】python字符串方法及格式设置
查看>>
【Python】random生成随机数
查看>>
【Python基础3】数字类型与常用运算
查看>>
Jenkins迁移jobs
查看>>
【Python基础4】for循环、while循环与if分支
查看>>
【Python基础5】列表和元组
查看>>
【Python基础6】格式化字符串
查看>>