GNU Classpath (0.97.2) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: import java.util.*; 9: import java.util.concurrent.locks.*; 10: import java.util.concurrent.atomic.*; 11: 12: /** 13: * A counting semaphore. Conceptually, a semaphore maintains a set of 14: * permits. Each {@link #acquire} blocks if necessary until a permit is 15: * available, and then takes it. Each {@link #release} adds a permit, 16: * potentially releasing a blocking acquirer. 17: * However, no actual permit objects are used; the {@code Semaphore} just 18: * keeps a count of the number available and acts accordingly. 19: * 20: * <p>Semaphores are often used to restrict the number of threads than can 21: * access some (physical or logical) resource. For example, here is 22: * a class that uses a semaphore to control access to a pool of items: 23: * <pre> 24: * class Pool { 25: * private static final int MAX_AVAILABLE = 100; 26: * private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); 27: * 28: * public Object getItem() throws InterruptedException { 29: * available.acquire(); 30: * return getNextAvailableItem(); 31: * } 32: * 33: * public void putItem(Object x) { 34: * if (markAsUnused(x)) 35: * available.release(); 36: * } 37: * 38: * // Not a particularly efficient data structure; just for demo 39: * 40: * protected Object[] items = ... whatever kinds of items being managed 41: * protected boolean[] used = new boolean[MAX_AVAILABLE]; 42: * 43: * protected synchronized Object getNextAvailableItem() { 44: * for (int i = 0; i < MAX_AVAILABLE; ++i) { 45: * if (!used[i]) { 46: * used[i] = true; 47: * return items[i]; 48: * } 49: * } 50: * return null; // not reached 51: * } 52: * 53: * protected synchronized boolean markAsUnused(Object item) { 54: * for (int i = 0; i < MAX_AVAILABLE; ++i) { 55: * if (item == items[i]) { 56: * if (used[i]) { 57: * used[i] = false; 58: * return true; 59: * } else 60: * return false; 61: * } 62: * } 63: * return false; 64: * } 65: * 66: * } 67: * </pre> 68: * 69: * <p>Before obtaining an item each thread must acquire a permit from 70: * the semaphore, guaranteeing that an item is available for use. When 71: * the thread has finished with the item it is returned back to the 72: * pool and a permit is returned to the semaphore, allowing another 73: * thread to acquire that item. Note that no synchronization lock is 74: * held when {@link #acquire} is called as that would prevent an item 75: * from being returned to the pool. The semaphore encapsulates the 76: * synchronization needed to restrict access to the pool, separately 77: * from any synchronization needed to maintain the consistency of the 78: * pool itself. 79: * 80: * <p>A semaphore initialized to one, and which is used such that it 81: * only has at most one permit available, can serve as a mutual 82: * exclusion lock. This is more commonly known as a <em>binary 83: * semaphore</em>, because it only has two states: one permit 84: * available, or zero permits available. When used in this way, the 85: * binary semaphore has the property (unlike many {@link Lock} 86: * implementations), that the "lock" can be released by a 87: * thread other than the owner (as semaphores have no notion of 88: * ownership). This can be useful in some specialized contexts, such 89: * as deadlock recovery. 90: * 91: * <p> The constructor for this class optionally accepts a 92: * <em>fairness</em> parameter. When set false, this class makes no 93: * guarantees about the order in which threads acquire permits. In 94: * particular, <em>barging</em> is permitted, that is, a thread 95: * invoking {@link #acquire} can be allocated a permit ahead of a 96: * thread that has been waiting - logically the new thread places itself at 97: * the head of the queue of waiting threads. When fairness is set true, the 98: * semaphore guarantees that threads invoking any of the {@link 99: * #acquire() acquire} methods are selected to obtain permits in the order in 100: * which their invocation of those methods was processed 101: * (first-in-first-out; FIFO). Note that FIFO ordering necessarily 102: * applies to specific internal points of execution within these 103: * methods. So, it is possible for one thread to invoke 104: * {@code acquire} before another, but reach the ordering point after 105: * the other, and similarly upon return from the method. 106: * Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not 107: * honor the fairness setting, but will take any permits that are 108: * available. 109: * 110: * <p>Generally, semaphores used to control resource access should be 111: * initialized as fair, to ensure that no thread is starved out from 112: * accessing a resource. When using semaphores for other kinds of 113: * synchronization control, the throughput advantages of non-fair 114: * ordering often outweigh fairness considerations. 115: * 116: * <p>This class also provides convenience methods to {@link 117: * #acquire(int) acquire} and {@link #release(int) release} multiple 118: * permits at a time. Beware of the increased risk of indefinite 119: * postponement when these methods are used without fairness set true. 120: * 121: * <p>Memory consistency effects: Actions in a thread prior to calling 122: * a "release" method such as {@code release()} 123: * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 124: * actions following a successful "acquire" method such as {@code acquire()} 125: * in another thread. 126: * 127: * @since 1.5 128: * @author Doug Lea 129: * 130: */ 131: 132: public class Semaphore implements java.io.Serializable { 133: private static final long serialVersionUID = -3222578661600680210L; 134: /** All mechanics via AbstractQueuedSynchronizer subclass */ 135: private final Sync sync; 136: 137: /** 138: * Synchronization implementation for semaphore. Uses AQS state 139: * to represent permits. Subclassed into fair and nonfair 140: * versions. 141: */ 142: abstract static class Sync extends AbstractQueuedSynchronizer { 143: private static final long serialVersionUID = 1192457210091910933L; 144: 145: Sync(int permits) { 146: setState(permits); 147: } 148: 149: final int getPermits() { 150: return getState(); 151: } 152: 153: final int nonfairTryAcquireShared(int acquires) { 154: for (;;) { 155: int available = getState(); 156: int remaining = available - acquires; 157: if (remaining < 0 || 158: compareAndSetState(available, remaining)) 159: return remaining; 160: } 161: } 162: 163: protected final boolean tryReleaseShared(int releases) { 164: for (;;) { 165: int p = getState(); 166: if (compareAndSetState(p, p + releases)) 167: return true; 168: } 169: } 170: 171: final void reducePermits(int reductions) { 172: for (;;) { 173: int current = getState(); 174: int next = current - reductions; 175: if (compareAndSetState(current, next)) 176: return; 177: } 178: } 179: 180: final int drainPermits() { 181: for (;;) { 182: int current = getState(); 183: if (current == 0 || compareAndSetState(current, 0)) 184: return current; 185: } 186: } 187: } 188: 189: /** 190: * NonFair version 191: */ 192: final static class NonfairSync extends Sync { 193: private static final long serialVersionUID = -2694183684443567898L; 194: 195: NonfairSync(int permits) { 196: super(permits); 197: } 198: 199: protected int tryAcquireShared(int acquires) { 200: return nonfairTryAcquireShared(acquires); 201: } 202: } 203: 204: /** 205: * Fair version 206: */ 207: final static class FairSync extends Sync { 208: private static final long serialVersionUID = 2014338818796000944L; 209: 210: FairSync(int permits) { 211: super(permits); 212: } 213: 214: protected int tryAcquireShared(int acquires) { 215: Thread current = Thread.currentThread(); 216: for (;;) { 217: Thread first = getFirstQueuedThread(); 218: if (first != null && first != current) 219: return -1; 220: int available = getState(); 221: int remaining = available - acquires; 222: if (remaining < 0 || 223: compareAndSetState(available, remaining)) 224: return remaining; 225: } 226: } 227: } 228: 229: /** 230: * Creates a {@code Semaphore} with the given number of 231: * permits and nonfair fairness setting. 232: * 233: * @param permits the initial number of permits available. 234: * This value may be negative, in which case releases 235: * must occur before any acquires will be granted. 236: */ 237: public Semaphore(int permits) { 238: sync = new NonfairSync(permits); 239: } 240: 241: /** 242: * Creates a {@code Semaphore} with the given number of 243: * permits and the given fairness setting. 244: * 245: * @param permits the initial number of permits available. 246: * This value may be negative, in which case releases 247: * must occur before any acquires will be granted. 248: * @param fair {@code true} if this semaphore will guarantee 249: * first-in first-out granting of permits under contention, 250: * else {@code false} 251: */ 252: public Semaphore(int permits, boolean fair) { 253: sync = (fair)? new FairSync(permits) : new NonfairSync(permits); 254: } 255: 256: /** 257: * Acquires a permit from this semaphore, blocking until one is 258: * available, or the thread is {@linkplain Thread#interrupt interrupted}. 259: * 260: * <p>Acquires a permit, if one is available and returns immediately, 261: * reducing the number of available permits by one. 262: * 263: * <p>If no permit is available then the current thread becomes 264: * disabled for thread scheduling purposes and lies dormant until 265: * one of two things happens: 266: * <ul> 267: * <li>Some other thread invokes the {@link #release} method for this 268: * semaphore and the current thread is next to be assigned a permit; or 269: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 270: * the current thread. 271: * </ul> 272: * 273: * <p>If the current thread: 274: * <ul> 275: * <li>has its interrupted status set on entry to this method; or 276: * <li>is {@linkplain Thread#interrupt interrupted} while waiting 277: * for a permit, 278: * </ul> 279: * then {@link InterruptedException} is thrown and the current thread's 280: * interrupted status is cleared. 281: * 282: * @throws InterruptedException if the current thread is interrupted 283: */ 284: public void acquire() throws InterruptedException { 285: sync.acquireSharedInterruptibly(1); 286: } 287: 288: /** 289: * Acquires a permit from this semaphore, blocking until one is 290: * available. 291: * 292: * <p>Acquires a permit, if one is available and returns immediately, 293: * reducing the number of available permits by one. 294: * 295: * <p>If no permit is available then the current thread becomes 296: * disabled for thread scheduling purposes and lies dormant until 297: * some other thread invokes the {@link #release} method for this 298: * semaphore and the current thread is next to be assigned a permit. 299: * 300: * <p>If the current thread is {@linkplain Thread#interrupt interrupted} 301: * while waiting for a permit then it will continue to wait, but the 302: * time at which the thread is assigned a permit may change compared to 303: * the time it would have received the permit had no interruption 304: * occurred. When the thread does return from this method its interrupt 305: * status will be set. 306: */ 307: public void acquireUninterruptibly() { 308: sync.acquireShared(1); 309: } 310: 311: /** 312: * Acquires a permit from this semaphore, only if one is available at the 313: * time of invocation. 314: * 315: * <p>Acquires a permit, if one is available and returns immediately, 316: * with the value {@code true}, 317: * reducing the number of available permits by one. 318: * 319: * <p>If no permit is available then this method will return 320: * immediately with the value {@code false}. 321: * 322: * <p>Even when this semaphore has been set to use a 323: * fair ordering policy, a call to {@code tryAcquire()} <em>will</em> 324: * immediately acquire a permit if one is available, whether or not 325: * other threads are currently waiting. 326: * This "barging" behavior can be useful in certain 327: * circumstances, even though it breaks fairness. If you want to honor 328: * the fairness setting, then use 329: * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) } 330: * which is almost equivalent (it also detects interruption). 331: * 332: * @return {@code true} if a permit was acquired and {@code false} 333: * otherwise 334: */ 335: public boolean tryAcquire() { 336: return sync.nonfairTryAcquireShared(1) >= 0; 337: } 338: 339: /** 340: * Acquires a permit from this semaphore, if one becomes available 341: * within the given waiting time and the current thread has not 342: * been {@linkplain Thread#interrupt interrupted}. 343: * 344: * <p>Acquires a permit, if one is available and returns immediately, 345: * with the value {@code true}, 346: * reducing the number of available permits by one. 347: * 348: * <p>If no permit is available then the current thread becomes 349: * disabled for thread scheduling purposes and lies dormant until 350: * one of three things happens: 351: * <ul> 352: * <li>Some other thread invokes the {@link #release} method for this 353: * semaphore and the current thread is next to be assigned a permit; or 354: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 355: * the current thread; or 356: * <li>The specified waiting time elapses. 357: * </ul> 358: * 359: * <p>If a permit is acquired then the value {@code true} is returned. 360: * 361: * <p>If the current thread: 362: * <ul> 363: * <li>has its interrupted status set on entry to this method; or 364: * <li>is {@linkplain Thread#interrupt interrupted} while waiting 365: * to acquire a permit, 366: * </ul> 367: * then {@link InterruptedException} is thrown and the current thread's 368: * interrupted status is cleared. 369: * 370: * <p>If the specified waiting time elapses then the value {@code false} 371: * is returned. If the time is less than or equal to zero, the method 372: * will not wait at all. 373: * 374: * @param timeout the maximum time to wait for a permit 375: * @param unit the time unit of the {@code timeout} argument 376: * @return {@code true} if a permit was acquired and {@code false} 377: * if the waiting time elapsed before a permit was acquired 378: * @throws InterruptedException if the current thread is interrupted 379: */ 380: public boolean tryAcquire(long timeout, TimeUnit unit) 381: throws InterruptedException { 382: return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 383: } 384: 385: /** 386: * Releases a permit, returning it to the semaphore. 387: * 388: * <p>Releases a permit, increasing the number of available permits by 389: * one. If any threads are trying to acquire a permit, then one is 390: * selected and given the permit that was just released. That thread 391: * is (re)enabled for thread scheduling purposes. 392: * 393: * <p>There is no requirement that a thread that releases a permit must 394: * have acquired that permit by calling {@link #acquire}. 395: * Correct usage of a semaphore is established by programming convention 396: * in the application. 397: */ 398: public void release() { 399: sync.releaseShared(1); 400: } 401: 402: /** 403: * Acquires the given number of permits from this semaphore, 404: * blocking until all are available, 405: * or the thread is {@linkplain Thread#interrupt interrupted}. 406: * 407: * <p>Acquires the given number of permits, if they are available, 408: * and returns immediately, reducing the number of available permits 409: * by the given amount. 410: * 411: * <p>If insufficient permits are available then the current thread becomes 412: * disabled for thread scheduling purposes and lies dormant until 413: * one of two things happens: 414: * <ul> 415: * <li>Some other thread invokes one of the {@link #release() release} 416: * methods for this semaphore, the current thread is next to be assigned 417: * permits and the number of available permits satisfies this request; or 418: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 419: * the current thread. 420: * </ul> 421: * 422: * <p>If the current thread: 423: * <ul> 424: * <li>has its interrupted status set on entry to this method; or 425: * <li>is {@linkplain Thread#interrupt interrupted} while waiting 426: * for a permit, 427: * </ul> 428: * then {@link InterruptedException} is thrown and the current thread's 429: * interrupted status is cleared. 430: * Any permits that were to be assigned to this thread are instead 431: * assigned to other threads trying to acquire permits, as if 432: * permits had been made available by a call to {@link #release()}. 433: * 434: * @param permits the number of permits to acquire 435: * @throws InterruptedException if the current thread is interrupted 436: * @throws IllegalArgumentException if {@code permits} is negative 437: */ 438: public void acquire(int permits) throws InterruptedException { 439: if (permits < 0) throw new IllegalArgumentException(); 440: sync.acquireSharedInterruptibly(permits); 441: } 442: 443: /** 444: * Acquires the given number of permits from this semaphore, 445: * blocking until all are available. 446: * 447: * <p>Acquires the given number of permits, if they are available, 448: * and returns immediately, reducing the number of available permits 449: * by the given amount. 450: * 451: * <p>If insufficient permits are available then the current thread becomes 452: * disabled for thread scheduling purposes and lies dormant until 453: * some other thread invokes one of the {@link #release() release} 454: * methods for this semaphore, the current thread is next to be assigned 455: * permits and the number of available permits satisfies this request. 456: * 457: * <p>If the current thread is {@linkplain Thread#interrupt interrupted} 458: * while waiting for permits then it will continue to wait and its 459: * position in the queue is not affected. When the thread does return 460: * from this method its interrupt status will be set. 461: * 462: * @param permits the number of permits to acquire 463: * @throws IllegalArgumentException if {@code permits} is negative 464: * 465: */ 466: public void acquireUninterruptibly(int permits) { 467: if (permits < 0) throw new IllegalArgumentException(); 468: sync.acquireShared(permits); 469: } 470: 471: /** 472: * Acquires the given number of permits from this semaphore, only 473: * if all are available at the time of invocation. 474: * 475: * <p>Acquires the given number of permits, if they are available, and 476: * returns immediately, with the value {@code true}, 477: * reducing the number of available permits by the given amount. 478: * 479: * <p>If insufficient permits are available then this method will return 480: * immediately with the value {@code false} and the number of available 481: * permits is unchanged. 482: * 483: * <p>Even when this semaphore has been set to use a fair ordering 484: * policy, a call to {@code tryAcquire} <em>will</em> 485: * immediately acquire a permit if one is available, whether or 486: * not other threads are currently waiting. This 487: * "barging" behavior can be useful in certain 488: * circumstances, even though it breaks fairness. If you want to 489: * honor the fairness setting, then use {@link #tryAcquire(int, 490: * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) } 491: * which is almost equivalent (it also detects interruption). 492: * 493: * @param permits the number of permits to acquire 494: * @return {@code true} if the permits were acquired and 495: * {@code false} otherwise 496: * @throws IllegalArgumentException if {@code permits} is negative 497: */ 498: public boolean tryAcquire(int permits) { 499: if (permits < 0) throw new IllegalArgumentException(); 500: return sync.nonfairTryAcquireShared(permits) >= 0; 501: } 502: 503: /** 504: * Acquires the given number of permits from this semaphore, if all 505: * become available within the given waiting time and the current 506: * thread has not been {@linkplain Thread#interrupt interrupted}. 507: * 508: * <p>Acquires the given number of permits, if they are available and 509: * returns immediately, with the value {@code true}, 510: * reducing the number of available permits by the given amount. 511: * 512: * <p>If insufficient permits are available then 513: * the current thread becomes disabled for thread scheduling 514: * purposes and lies dormant until one of three things happens: 515: * <ul> 516: * <li>Some other thread invokes one of the {@link #release() release} 517: * methods for this semaphore, the current thread is next to be assigned 518: * permits and the number of available permits satisfies this request; or 519: * <li>Some other thread {@linkplain Thread#interrupt interrupts} 520: * the current thread; or 521: * <li>The specified waiting time elapses. 522: * </ul> 523: * 524: * <p>If the permits are acquired then the value {@code true} is returned. 525: * 526: * <p>If the current thread: 527: * <ul> 528: * <li>has its interrupted status set on entry to this method; or 529: * <li>is {@linkplain Thread#interrupt interrupted} while waiting 530: * to acquire the permits, 531: * </ul> 532: * then {@link InterruptedException} is thrown and the current thread's 533: * interrupted status is cleared. 534: * Any permits that were to be assigned to this thread, are instead 535: * assigned to other threads trying to acquire permits, as if 536: * the permits had been made available by a call to {@link #release()}. 537: * 538: * <p>If the specified waiting time elapses then the value {@code false} 539: * is returned. If the time is less than or equal to zero, the method 540: * will not wait at all. Any permits that were to be assigned to this 541: * thread, are instead assigned to other threads trying to acquire 542: * permits, as if the permits had been made available by a call to 543: * {@link #release()}. 544: * 545: * @param permits the number of permits to acquire 546: * @param timeout the maximum time to wait for the permits 547: * @param unit the time unit of the {@code timeout} argument 548: * @return {@code true} if all permits were acquired and {@code false} 549: * if the waiting time elapsed before all permits were acquired 550: * @throws InterruptedException if the current thread is interrupted 551: * @throws IllegalArgumentException if {@code permits} is negative 552: */ 553: public boolean tryAcquire(int permits, long timeout, TimeUnit unit) 554: throws InterruptedException { 555: if (permits < 0) throw new IllegalArgumentException(); 556: return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); 557: } 558: 559: /** 560: * Releases the given number of permits, returning them to the semaphore. 561: * 562: * <p>Releases the given number of permits, increasing the number of 563: * available permits by that amount. 564: * If any threads are trying to acquire permits, then one 565: * is selected and given the permits that were just released. 566: * If the number of available permits satisfies that thread's request 567: * then that thread is (re)enabled for thread scheduling purposes; 568: * otherwise the thread will wait until sufficient permits are available. 569: * If there are still permits available 570: * after this thread's request has been satisfied, then those permits 571: * are assigned in turn to other threads trying to acquire permits. 572: * 573: * <p>There is no requirement that a thread that releases a permit must 574: * have acquired that permit by calling {@link Semaphore#acquire acquire}. 575: * Correct usage of a semaphore is established by programming convention 576: * in the application. 577: * 578: * @param permits the number of permits to release 579: * @throws IllegalArgumentException if {@code permits} is negative 580: */ 581: public void release(int permits) { 582: if (permits < 0) throw new IllegalArgumentException(); 583: sync.releaseShared(permits); 584: } 585: 586: /** 587: * Returns the current number of permits available in this semaphore. 588: * 589: * <p>This method is typically used for debugging and testing purposes. 590: * 591: * @return the number of permits available in this semaphore 592: */ 593: public int availablePermits() { 594: return sync.getPermits(); 595: } 596: 597: /** 598: * Acquires and returns all permits that are immediately available. 599: * 600: * @return the number of permits acquired 601: */ 602: public int drainPermits() { 603: return sync.drainPermits(); 604: } 605: 606: /** 607: * Shrinks the number of available permits by the indicated 608: * reduction. This method can be useful in subclasses that use 609: * semaphores to track resources that become unavailable. This 610: * method differs from {@code acquire} in that it does not block 611: * waiting for permits to become available. 612: * 613: * @param reduction the number of permits to remove 614: * @throws IllegalArgumentException if {@code reduction} is negative 615: */ 616: protected void reducePermits(int reduction) { 617: if (reduction < 0) throw new IllegalArgumentException(); 618: sync.reducePermits(reduction); 619: } 620: 621: /** 622: * Returns {@code true} if this semaphore has fairness set true. 623: * 624: * @return {@code true} if this semaphore has fairness set true 625: */ 626: public boolean isFair() { 627: return sync instanceof FairSync; 628: } 629: 630: /** 631: * Queries whether any threads are waiting to acquire. Note that 632: * because cancellations may occur at any time, a {@code true} 633: * return does not guarantee that any other thread will ever 634: * acquire. This method is designed primarily for use in 635: * monitoring of the system state. 636: * 637: * @return {@code true} if there may be other threads waiting to 638: * acquire the lock 639: */ 640: public final boolean hasQueuedThreads() { 641: return sync.hasQueuedThreads(); 642: } 643: 644: /** 645: * Returns an estimate of the number of threads waiting to acquire. 646: * The value is only an estimate because the number of threads may 647: * change dynamically while this method traverses internal data 648: * structures. This method is designed for use in monitoring of the 649: * system state, not for synchronization control. 650: * 651: * @return the estimated number of threads waiting for this lock 652: */ 653: public final int getQueueLength() { 654: return sync.getQueueLength(); 655: } 656: 657: /** 658: * Returns a collection containing threads that may be waiting to acquire. 659: * Because the actual set of threads may change dynamically while 660: * constructing this result, the returned collection is only a best-effort 661: * estimate. The elements of the returned collection are in no particular 662: * order. This method is designed to facilitate construction of 663: * subclasses that provide more extensive monitoring facilities. 664: * 665: * @return the collection of threads 666: */ 667: protected Collection<Thread> getQueuedThreads() { 668: return sync.getQueuedThreads(); 669: } 670: 671: /** 672: * Returns a string identifying this semaphore, as well as its state. 673: * The state, in brackets, includes the String {@code "Permits ="} 674: * followed by the number of permits. 675: * 676: * @return a string identifying this semaphore, as well as its state 677: */ 678: public String toString() { 679: return super.toString() + "[Permits = " + sync.getPermits() + "]"; 680: } 681: }
GNU Classpath (0.97.2) |