GNU Classpath (0.97.2) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: import java.util.concurrent.locks.*; 9: 10: /** 11: * A synchronization aid that allows a set of threads to all wait for 12: * each other to reach a common barrier point. CyclicBarriers are 13: * useful in programs involving a fixed sized party of threads that 14: * must occasionally wait for each other. The barrier is called 15: * <em>cyclic</em> because it can be re-used after the waiting threads 16: * are released. 17: * 18: * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command 19: * that is run once per barrier point, after the last thread in the party 20: * arrives, but before any threads are released. 21: * This <em>barrier action</em> is useful 22: * for updating shared-state before any of the parties continue. 23: * 24: * <p><b>Sample usage:</b> Here is an example of 25: * using a barrier in a parallel decomposition design: 26: * <pre> 27: * class Solver { 28: * final int N; 29: * final float[][] data; 30: * final CyclicBarrier barrier; 31: * 32: * class Worker implements Runnable { 33: * int myRow; 34: * Worker(int row) { myRow = row; } 35: * public void run() { 36: * while (!done()) { 37: * processRow(myRow); 38: * 39: * try { 40: * barrier.await(); 41: * } catch (InterruptedException ex) { 42: * return; 43: * } catch (BrokenBarrierException ex) { 44: * return; 45: * } 46: * } 47: * } 48: * } 49: * 50: * public Solver(float[][] matrix) { 51: * data = matrix; 52: * N = matrix.length; 53: * barrier = new CyclicBarrier(N, 54: * new Runnable() { 55: * public void run() { 56: * mergeRows(...); 57: * } 58: * }); 59: * for (int i = 0; i < N; ++i) 60: * new Thread(new Worker(i)).start(); 61: * 62: * waitUntilDone(); 63: * } 64: * } 65: * </pre> 66: * Here, each worker thread processes a row of the matrix then waits at the 67: * barrier until all rows have been processed. When all rows are processed 68: * the supplied {@link Runnable} barrier action is executed and merges the 69: * rows. If the merger 70: * determines that a solution has been found then <tt>done()</tt> will return 71: * <tt>true</tt> and each worker will terminate. 72: * 73: * <p>If the barrier action does not rely on the parties being suspended when 74: * it is executed, then any of the threads in the party could execute that 75: * action when it is released. To facilitate this, each invocation of 76: * {@link #await} returns the arrival index of that thread at the barrier. 77: * You can then choose which thread should execute the barrier action, for 78: * example: 79: * <pre> if (barrier.await() == 0) { 80: * // log the completion of this iteration 81: * }</pre> 82: * 83: * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model 84: * for failed synchronization attempts: If a thread leaves a barrier 85: * point prematurely because of interruption, failure, or timeout, all 86: * other threads waiting at that barrier point will also leave 87: * abnormally via {@link BrokenBarrierException} (or 88: * {@link InterruptedException} if they too were interrupted at about 89: * the same time). 90: * 91: * <p>Memory consistency effects: Actions in a thread prior to calling 92: * {@code await()} 93: * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 94: * actions that are part of the barrier action, which in turn 95: * <i>happen-before</i> actions following a successful return from the 96: * corresponding {@code await()} in other threads. 97: * 98: * @since 1.5 99: * @see CountDownLatch 100: * 101: * @author Doug Lea 102: */ 103: public class CyclicBarrier { 104: /** 105: * Each use of the barrier is represented as a generation instance. 106: * The generation changes whenever the barrier is tripped, or 107: * is reset. There can be many generations associated with threads 108: * using the barrier - due to the non-deterministic way the lock 109: * may be allocated to waiting threads - but only one of these 110: * can be active at a time (the one to which <tt>count</tt> applies) 111: * and all the rest are either broken or tripped. 112: * There need not be an active generation if there has been a break 113: * but no subsequent reset. 114: */ 115: private static class Generation { 116: boolean broken = false; 117: } 118: 119: /** The lock for guarding barrier entry */ 120: private final ReentrantLock lock = new ReentrantLock(); 121: /** Condition to wait on until tripped */ 122: private final Condition trip = lock.newCondition(); 123: /** The number of parties */ 124: private final int parties; 125: /* The command to run when tripped */ 126: private final Runnable barrierCommand; 127: /** The current generation */ 128: private Generation generation = new Generation(); 129: 130: /** 131: * Number of parties still waiting. Counts down from parties to 0 132: * on each generation. It is reset to parties on each new 133: * generation or when broken. 134: */ 135: private int count; 136: 137: /** 138: * Updates state on barrier trip and wakes up everyone. 139: * Called only while holding lock. 140: */ 141: private void nextGeneration() { 142: // signal completion of last generation 143: trip.signalAll(); 144: // set up next generation 145: count = parties; 146: generation = new Generation(); 147: } 148: 149: /** 150: * Sets current barrier generation as broken and wakes up everyone. 151: * Called only while holding lock. 152: */ 153: private void breakBarrier() { 154: generation.broken = true; 155: count = parties; 156: trip.signalAll(); 157: } 158: 159: /** 160: * Main barrier code, covering the various policies. 161: */ 162: private int dowait(boolean timed, long nanos) 163: throws InterruptedException, BrokenBarrierException, 164: TimeoutException { 165: final ReentrantLock lock = this.lock; 166: lock.lock(); 167: try { 168: final Generation g = generation; 169: 170: if (g.broken) 171: throw new BrokenBarrierException(); 172: 173: if (Thread.interrupted()) { 174: breakBarrier(); 175: throw new InterruptedException(); 176: } 177: 178: int index = --count; 179: if (index == 0) { // tripped 180: boolean ranAction = false; 181: try { 182: final Runnable command = barrierCommand; 183: if (command != null) 184: command.run(); 185: ranAction = true; 186: nextGeneration(); 187: return 0; 188: } finally { 189: if (!ranAction) 190: breakBarrier(); 191: } 192: } 193: 194: // loop until tripped, broken, interrupted, or timed out 195: for (;;) { 196: try { 197: if (!timed) 198: trip.await(); 199: else if (nanos > 0L) 200: nanos = trip.awaitNanos(nanos); 201: } catch (InterruptedException ie) { 202: if (g == generation && ! g.broken) { 203: breakBarrier(); 204: throw ie; 205: } else { 206: // We're about to finish waiting even if we had not 207: // been interrupted, so this interrupt is deemed to 208: // "belong" to subsequent execution. 209: Thread.currentThread().interrupt(); 210: } 211: } 212: 213: if (g.broken) 214: throw new BrokenBarrierException(); 215: 216: if (g != generation) 217: return index; 218: 219: if (timed && nanos <= 0L) { 220: breakBarrier(); 221: throw new TimeoutException(); 222: } 223: } 224: } finally { 225: lock.unlock(); 226: } 227: } 228: 229: /** 230: * Creates a new <tt>CyclicBarrier</tt> that will trip when the 231: * given number of parties (threads) are waiting upon it, and which 232: * will execute the given barrier action when the barrier is tripped, 233: * performed by the last thread entering the barrier. 234: * 235: * @param parties the number of threads that must invoke {@link #await} 236: * before the barrier is tripped 237: * @param barrierAction the command to execute when the barrier is 238: * tripped, or {@code null} if there is no action 239: * @throws IllegalArgumentException if {@code parties} is less than 1 240: */ 241: public CyclicBarrier(int parties, Runnable barrierAction) { 242: if (parties <= 0) throw new IllegalArgumentException(); 243: this.parties = parties; 244: this.count = parties; 245: this.barrierCommand = barrierAction; 246: } 247: 248: /** 249: * Creates a new <tt>CyclicBarrier</tt> that will trip when the 250: * given number of parties (threads) are waiting upon it, and 251: * does not perform a predefined action when the barrier is tripped. 252: * 253: * @param parties the number of threads that must invoke {@link #await} 254: * before the barrier is tripped 255: * @throws IllegalArgumentException if {@code parties} is less than 1 256: */ 257: public CyclicBarrier(int parties) { 258: this(parties, null); 259: } 260: 261: /** 262: * Returns the number of parties required to trip this barrier. 263: * 264: * @return the number of parties required to trip this barrier 265: */ 266: public int getParties() { 267: return parties; 268: } 269: 270: /** 271: * Waits until all {@linkplain #getParties parties} have invoked 272: * <tt>await</tt> on this barrier. 273: * 274: * <p>If the current thread is not the last to arrive then it is 275: * disabled for thread scheduling purposes and lies dormant until 276: * one of the following things happens: 277: * <ul> 278: * <li>The last thread arrives; or 279: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 280: * the current thread; or 281: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 282: * one of the other waiting threads; or 283: * <li>Some other thread times out while waiting for barrier; or 284: * <li>Some other thread invokes {@link #reset} on this barrier. 285: * </ul> 286: * 287: * <p>If the current thread: 288: * <ul> 289: * <li>has its interrupted status set on entry to this method; or 290: * <li>is {@linkplain Thread#interrupt interrupted} while waiting 291: * </ul> 292: * then {@link InterruptedException} is thrown and the current thread's 293: * interrupted status is cleared. 294: * 295: * <p>If the barrier is {@link #reset} while any thread is waiting, 296: * or if the barrier {@linkplain #isBroken is broken} when 297: * <tt>await</tt> is invoked, or while any thread is waiting, then 298: * {@link BrokenBarrierException} is thrown. 299: * 300: * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, 301: * then all other waiting threads will throw 302: * {@link BrokenBarrierException} and the barrier is placed in the broken 303: * state. 304: * 305: * <p>If the current thread is the last thread to arrive, and a 306: * non-null barrier action was supplied in the constructor, then the 307: * current thread runs the action before allowing the other threads to 308: * continue. 309: * If an exception occurs during the barrier action then that exception 310: * will be propagated in the current thread and the barrier is placed in 311: * the broken state. 312: * 313: * @return the arrival index of the current thread, where index 314: * <tt>{@link #getParties()} - 1</tt> indicates the first 315: * to arrive and zero indicates the last to arrive 316: * @throws InterruptedException if the current thread was interrupted 317: * while waiting 318: * @throws BrokenBarrierException if <em>another</em> thread was 319: * interrupted or timed out while the current thread was 320: * waiting, or the barrier was reset, or the barrier was 321: * broken when {@code await} was called, or the barrier 322: * action (if present) failed due an exception. 323: */ 324: public int await() throws InterruptedException, BrokenBarrierException { 325: try { 326: return dowait(false, 0L); 327: } catch (TimeoutException toe) { 328: throw new Error(toe); // cannot happen; 329: } 330: } 331: 332: /** 333: * Waits until all {@linkplain #getParties parties} have invoked 334: * <tt>await</tt> on this barrier, or the specified waiting time elapses. 335: * 336: * <p>If the current thread is not the last to arrive then it is 337: * disabled for thread scheduling purposes and lies dormant until 338: * one of the following things happens: 339: * <ul> 340: * <li>The last thread arrives; or 341: * <li>The specified timeout elapses; or 342: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 343: * the current thread; or 344: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 345: * one of the other waiting threads; or 346: * <li>Some other thread times out while waiting for barrier; or 347: * <li>Some other thread invokes {@link #reset} on this barrier. 348: * </ul> 349: * 350: * <p>If the current thread: 351: * <ul> 352: * <li>has its interrupted status set on entry to this method; or 353: * <li>is {@linkplain Thread#interrupt interrupted} while waiting 354: * </ul> 355: * then {@link InterruptedException} is thrown and the current thread's 356: * interrupted status is cleared. 357: * 358: * <p>If the specified waiting time elapses then {@link TimeoutException} 359: * is thrown. If the time is less than or equal to zero, the 360: * method will not wait at all. 361: * 362: * <p>If the barrier is {@link #reset} while any thread is waiting, 363: * or if the barrier {@linkplain #isBroken is broken} when 364: * <tt>await</tt> is invoked, or while any thread is waiting, then 365: * {@link BrokenBarrierException} is thrown. 366: * 367: * <p>If any thread is {@linkplain Thread#interrupt interrupted} while 368: * waiting, then all other waiting threads will throw {@link 369: * BrokenBarrierException} and the barrier is placed in the broken 370: * state. 371: * 372: * <p>If the current thread is the last thread to arrive, and a 373: * non-null barrier action was supplied in the constructor, then the 374: * current thread runs the action before allowing the other threads to 375: * continue. 376: * If an exception occurs during the barrier action then that exception 377: * will be propagated in the current thread and the barrier is placed in 378: * the broken state. 379: * 380: * @param timeout the time to wait for the barrier 381: * @param unit the time unit of the timeout parameter 382: * @return the arrival index of the current thread, where index 383: * <tt>{@link #getParties()} - 1</tt> indicates the first 384: * to arrive and zero indicates the last to arrive 385: * @throws InterruptedException if the current thread was interrupted 386: * while waiting 387: * @throws TimeoutException if the specified timeout elapses 388: * @throws BrokenBarrierException if <em>another</em> thread was 389: * interrupted or timed out while the current thread was 390: * waiting, or the barrier was reset, or the barrier was broken 391: * when {@code await} was called, or the barrier action (if 392: * present) failed due an exception 393: */ 394: public int await(long timeout, TimeUnit unit) 395: throws InterruptedException, 396: BrokenBarrierException, 397: TimeoutException { 398: return dowait(true, unit.toNanos(timeout)); 399: } 400: 401: /** 402: * Queries if this barrier is in a broken state. 403: * 404: * @return {@code true} if one or more parties broke out of this 405: * barrier due to interruption or timeout since 406: * construction or the last reset, or a barrier action 407: * failed due to an exception; {@code false} otherwise. 408: */ 409: public boolean isBroken() { 410: final ReentrantLock lock = this.lock; 411: lock.lock(); 412: try { 413: return generation.broken; 414: } finally { 415: lock.unlock(); 416: } 417: } 418: 419: /** 420: * Resets the barrier to its initial state. If any parties are 421: * currently waiting at the barrier, they will return with a 422: * {@link BrokenBarrierException}. Note that resets <em>after</em> 423: * a breakage has occurred for other reasons can be complicated to 424: * carry out; threads need to re-synchronize in some other way, 425: * and choose one to perform the reset. It may be preferable to 426: * instead create a new barrier for subsequent use. 427: */ 428: public void reset() { 429: final ReentrantLock lock = this.lock; 430: lock.lock(); 431: try { 432: breakBarrier(); // break the current generation 433: nextGeneration(); // start a new generation 434: } finally { 435: lock.unlock(); 436: } 437: } 438: 439: /** 440: * Returns the number of parties currently waiting at the barrier. 441: * This method is primarily useful for debugging and assertions. 442: * 443: * @return the number of parties currently blocked in {@link #await} 444: */ 445: public int getNumberWaiting() { 446: final ReentrantLock lock = this.lock; 447: lock.lock(); 448: try { 449: return parties - count; 450: } finally { 451: lock.unlock(); 452: } 453: } 454: }
GNU Classpath (0.97.2) |