//用来锁住屏障的锁 privatefinal ReentrantLock lock = new ReentrantLock(); //需要等待的条件 privatefinal Condition trip = lock.newCondition(); //参与等待的数量 privatefinalint parties; //当栅栏释放之后,需要执行的命令 privatefinal Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation();
/** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ privateint count;
privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //先获取到当前的锁,然后锁住当前的线程 lock.lock(); try { final Generation g = generation; //获取到当前的
if (g.broken) //如果当前需要进行等待的cyclicBarrier已经被打破,会抛出异常 thrownew BrokenBarrierException();
if (Thread.interrupted()) { breakBarrier(); //如果当前线程被中断了,则会打破屏障,然后通知所有等待的线程 thrownew InterruptedException(); }
int index = --count; //对count数量进行减一,然后获取到当前的序号 if (index == 0) { // tripped Runnable command = barrierCommand; if (command != null) { //如果count在自减之后为0,说明可以打破屏障了,执行传入的命令 try { command.run(); } catch (Throwable ex) { breakBarrier(); throw ex; } } nextGeneration(); //唤醒所有的线程,然后准备进行下一个周期 return0; }
// loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //如果没有设置超时时间,就一直等待,释放锁 trip.await(); elseif (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } }
if (g.broken) thrownew BrokenBarrierException(); //如果是因为出现了异常而被唤醒的,则抛出异常
if (g != generation) //如果周期相同,说明没有进入到下一个周期,等待进入下一个周期 return index;
privatevoidnextGeneration(){ // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }