本文共 5655 字,大约阅读时间需要 18 分钟。
简述:CyclicBarrier是提供为了让多个线程在达到某个点的情况下,再继续往下执行,举个例子,在计算excle表格中的数据时,每个线程进行每行数据的求和操作,在每个线程都执行结束之后,即到这个点上,再进行每行数据的求和操作,这就是CyclicBarrier要解决的问题。
先来看下javadoc中给出的示例代码
class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); //处理每行数据 try { barrier.await(); //处理结束后等待 } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); }}; //计算完每行操作之后,执行求和操作 barrier = new CyclicBarrier(N, barrierAction); //创建一个barrier对象 Listthreads = new ArrayList (N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); } }
上述代码是官网提供出来的一个使用场景,部分实例是伪代码
每个线程可以在barrier.await之前执行一部分计算,barrier.await会挂起当前线程,等到线程await的数量等于barrier设定的初始数量,之前被挂起的线程会被唤醒,执行线程当中barrier.await后面的代码逻辑
具体的示例代码
public class CyclicBarrierTest2 { static CyclicBarrier barrier = new CyclicBarrier(2,new A()); public static void main(String[] args) { new Thread(()->{ try{ barrier.await(); }catch (Exception e) { } System.out.println(1); }).start(); try{ barrier.await(); }catch (Exception e){ } System.out.println(2); }}class A implements Runnable{ @Override public void run() { System.out.println(3); }}
执行结果 3 1 2 或者 3 2 1
关于使用方式,及异常情况这篇日志给出了非常详细的示例,对于reset,等待超时,线程中断等情况下,其他线程受到的影响这篇文章都非常详细进行了分析
源码分析
关于CyclicBarrier他的实现方式是通过内部ReentrantLock来对资源进行加锁和释放锁操作,通过ReentrantLock中的condition来进行线程阻塞和线程的唤起,关于ReentrantLock不熟悉的话,可以参看我之前的日志
先来看下CylicBarrier属性
public class CyclicBarrier { /** 提供一个锁 */ private final ReentrantLock lock = new ReentrantLock(); /** condition来决定什么时候进行挂起和唤醒 */ private final Condition trip = lock.newCondition(); /** 初始化设定的阻塞线程数 */ private final int parties; /* 到达线程屏障后,需要执行的操作*/ private final Runnable barrierCommand; /** 内部类,维护boolean变量判断屏障是否失效 */ private Generation generation = new Generation();// 阻塞线程数-已经挂起的线程数 private int count;}
接下来我们看下当调用cyclicBarrier的await方法的实现逻辑
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
调用dowait方法,是整个cyclicBarrier实现的核心代码
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); //先对以下的操作进行加锁 try { final Generation g = generation; if (g.broken) // 如果已经broken 直接抛出异常 throw new BrokenBarrierException(); if (Thread.interrupted()) { //如果线程中断,generation中的broken设置为true, 并唤醒挂起的线程 breakBarrier(); //将count值重新设置成初始化的线程数 throw new InterruptedException(); //抛出中断异常 } int index = --count; // count值减一,没调用一次await方法,count值减一,初始化时等于设定的线程数 if (index == 0) { // tripped //如果所有的线程都已经执行了await方法 boolean ranAction = false; try { final Runnable command = barrierCommand; //判断到达屏障处,是否有其他的操作,如果是的话,在当前线程中执行 if (command != null) command.run(); ranAction = true; nextGeneration(); // 唤醒所有挂起的线程,将count的值,重新更新为初始时的值,generation也创建一个新的 return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果当前所有线程还没有执行到await方法 for (;;) { try { if (!timed) trip.await(); //直接挂起线程,并在最后释放掉锁 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) //需要注意,在count的值减到0时,会重新创建一个generation, 这个时候g和之前不是同一个,return return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
以上就是CyclicBarrier的使用和源码分析,相对来说CyclicBarrier的源码是比较简洁易懂的
CyclicBarrier和CountDownLatch的区别就是CyclicBarrier是可以重复使用的,而CountDownLatch只能使用一次
参考:
转载地址:http://govti.baihongyu.com/