GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent.locks; 8: import java.util.*; 9: import java.util.concurrent.*; 10: import java.util.concurrent.atomic.*; 11: import sun.misc.Unsafe; 12: 13: /** 14: * Provides a framework for implementing blocking locks and related 15: * synchronizers (semaphores, events, etc) that rely on 16: * first-in-first-out (FIFO) wait queues. This class is designed to 17: * be a useful basis for most kinds of synchronizers that rely on a 18: * single atomic <tt>int</tt> value to represent state. Subclasses 19: * must define the protected methods that change this state, and which 20: * define what that state means in terms of this object being acquired 21: * or released. Given these, the other methods in this class carry 22: * out all queuing and blocking mechanics. Subclasses can maintain 23: * other state fields, but only the atomically updated <tt>int</tt> 24: * value manipulated using methods {@link #getState}, {@link 25: * #setState} and {@link #compareAndSetState} is tracked with respect 26: * to synchronization. 27: * 28: * <p>Subclasses should be defined as non-public internal helper 29: * classes that are used to implement the synchronization properties 30: * of their enclosing class. Class 31: * <tt>AbstractQueuedSynchronizer</tt> does not implement any 32: * synchronization interface. Instead it defines methods such as 33: * {@link #acquireInterruptibly} that can be invoked as 34: * appropriate by concrete locks and related synchronizers to 35: * implement their public methods. 36: * 37: * <p>This class supports either or both a default <em>exclusive</em> 38: * mode and a <em>shared</em> mode. When acquired in exclusive mode, 39: * attempted acquires by other threads cannot succeed. Shared mode 40: * acquires by multiple threads may (but need not) succeed. This class 41: * does not "understand" these differences except in the 42: * mechanical sense that when a shared mode acquire succeeds, the next 43: * waiting thread (if one exists) must also determine whether it can 44: * acquire as well. Threads waiting in the different modes share the 45: * same FIFO queue. Usually, implementation subclasses support only 46: * one of these modes, but both can come into play for example in a 47: * {@link ReadWriteLock}. Subclasses that support only exclusive or 48: * only shared modes need not define the methods supporting the unused mode. 49: * 50: * <p>This class defines a nested {@link ConditionObject} class that 51: * can be used as a {@link Condition} implementation by subclasses 52: * supporting exclusive mode for which method {@link 53: * #isHeldExclusively} reports whether synchronization is exclusively 54: * held with respect to the current thread, method {@link #release} 55: * invoked with the current {@link #getState} value fully releases 56: * this object, and {@link #acquire}, given this saved state value, 57: * eventually restores this object to its previous acquired state. No 58: * <tt>AbstractQueuedSynchronizer</tt> method otherwise creates such a 59: * condition, so if this constraint cannot be met, do not use it. The 60: * behavior of {@link ConditionObject} depends of course on the 61: * semantics of its synchronizer implementation. 62: * 63: * <p>This class provides inspection, instrumentation, and monitoring 64: * methods for the internal queue, as well as similar methods for 65: * condition objects. These can be exported as desired into classes 66: * using an <tt>AbstractQueuedSynchronizer</tt> for their 67: * synchronization mechanics. 68: * 69: * <p>Serialization of this class stores only the underlying atomic 70: * integer maintaining state, so deserialized objects have empty 71: * thread queues. Typical subclasses requiring serializability will 72: * define a <tt>readObject</tt> method that restores this to a known 73: * initial state upon deserialization. 74: * 75: * <h3>Usage</h3> 76: * 77: * <p>To use this class as the basis of a synchronizer, redefine the 78: * following methods, as applicable, by inspecting and/or modifying 79: * the synchronization state using {@link #getState}, {@link 80: * #setState} and/or {@link #compareAndSetState}: 81: * 82: * <ul> 83: * <li> {@link #tryAcquire} 84: * <li> {@link #tryRelease} 85: * <li> {@link #tryAcquireShared} 86: * <li> {@link #tryReleaseShared} 87: * <li> {@link #isHeldExclusively} 88: *</ul> 89: * 90: * Each of these methods by default throws {@link 91: * UnsupportedOperationException}. Implementations of these methods 92: * must be internally thread-safe, and should in general be short and 93: * not block. Defining these methods is the <em>only</em> supported 94: * means of using this class. All other methods are declared 95: * <tt>final</tt> because they cannot be independently varied. 96: * 97: * <p>You may also find the inherited methods from {@link 98: * AbstractOwnableSynchronizer} useful to keep track of the thread 99: * owning an exclusive synchronizer. You are encouraged to use them 100: * -- this enables monitoring and diagnostic tools to assist users in 101: * determining which threads hold locks. 102: * 103: * <p>Even though this class is based on an internal FIFO queue, it 104: * does not automatically enforce FIFO acquisition policies. The core 105: * of exclusive synchronization takes the form: 106: * 107: * <pre> 108: * Acquire: 109: * while (!tryAcquire(arg)) { 110: * <em>enqueue thread if it is not already queued</em>; 111: * <em>possibly block current thread</em>; 112: * } 113: * 114: * Release: 115: * if (tryRelease(arg)) 116: * <em>unblock the first queued thread</em>; 117: * </pre> 118: * 119: * (Shared mode is similar but may involve cascading signals.) 120: * 121: * <p>Because checks in acquire are invoked before enqueuing, a newly 122: * acquiring thread may <em>barge</em> ahead of others that are 123: * blocked and queued. However, you can, if desired, define 124: * <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to disable 125: * barging by internally invoking one or more of the inspection 126: * methods. In particular, a strict FIFO lock can define 127: * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link 128: * #getFirstQueuedThread} does not return the current thread. A 129: * normally preferable non-strict fair version can immediately return 130: * <tt>false</tt> only if {@link #hasQueuedThreads} returns 131: * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current 132: * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both 133: * non-null and not the current thread. Further variations are 134: * possible. 135: * 136: * <p>Throughput and scalability are generally highest for the 137: * default barging (also known as <em>greedy</em>, 138: * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 139: * While this is not guaranteed to be fair or starvation-free, earlier 140: * queued threads are allowed to recontend before later queued 141: * threads, and each recontention has an unbiased chance to succeed 142: * against incoming threads. Also, while acquires do not 143: * "spin" in the usual sense, they may perform multiple 144: * invocations of <tt>tryAcquire</tt> interspersed with other 145: * computations before blocking. This gives most of the benefits of 146: * spins when exclusive synchronization is only briefly held, without 147: * most of the liabilities when it isn't. If so desired, you can 148: * augment this by preceding calls to acquire methods with 149: * "fast-path" checks, possibly prechecking {@link #hasContended} 150: * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 151: * is likely not to be contended. 152: * 153: * <p>This class provides an efficient and scalable basis for 154: * synchronization in part by specializing its range of use to 155: * synchronizers that can rely on <tt>int</tt> state, acquire, and 156: * release parameters, and an internal FIFO wait queue. When this does 157: * not suffice, you can build synchronizers from a lower level using 158: * {@link java.util.concurrent.atomic atomic} classes, your own custom 159: * {@link java.util.Queue} classes, and {@link LockSupport} blocking 160: * support. 161: * 162: * <h3>Usage Examples</h3> 163: * 164: * <p>Here is a non-reentrant mutual exclusion lock class that uses 165: * the value zero to represent the unlocked state, and one to 166: * represent the locked state. While a non-reentrant lock 167: * does not strictly require recording of the current owner 168: * thread, this class does so anyway to make usage easier to monitor. 169: * It also supports conditions and exposes 170: * one of the instrumentation methods: 171: * 172: * <pre> 173: * class Mutex implements Lock, java.io.Serializable { 174: * 175: * // Our internal helper class 176: * private static class Sync extends AbstractQueuedSynchronizer { 177: * // Report whether in locked state 178: * protected boolean isHeldExclusively() { 179: * return getState() == 1; 180: * } 181: * 182: * // Acquire the lock if state is zero 183: * public boolean tryAcquire(int acquires) { 184: * assert acquires == 1; // Otherwise unused 185: * if (compareAndSetState(0, 1)) { 186: * setExclusiveOwnerThread(Thread.currentThread()); 187: * return true; 188: * } 189: * return false; 190: * } 191: * 192: * // Release the lock by setting state to zero 193: * protected boolean tryRelease(int releases) { 194: * assert releases == 1; // Otherwise unused 195: * if (getState() == 0) throw new IllegalMonitorStateException(); 196: * setExclusiveOwnerThread(null); 197: * setState(0); 198: * return true; 199: * } 200: * 201: * // Provide a Condition 202: * Condition newCondition() { return new ConditionObject(); } 203: * 204: * // Deserialize properly 205: * private void readObject(ObjectInputStream s) 206: * throws IOException, ClassNotFoundException { 207: * s.defaultReadObject(); 208: * setState(0); // reset to unlocked state 209: * } 210: * } 211: * 212: * // The sync object does all the hard work. We just forward to it. 213: * private final Sync sync = new Sync(); 214: * 215: * public void lock() { sync.acquire(1); } 216: * public boolean tryLock() { return sync.tryAcquire(1); } 217: * public void unlock() { sync.release(1); } 218: * public Condition newCondition() { return sync.newCondition(); } 219: * public boolean isLocked() { return sync.isHeldExclusively(); } 220: * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 221: * public void lockInterruptibly() throws InterruptedException { 222: * sync.acquireInterruptibly(1); 223: * } 224: * public boolean tryLock(long timeout, TimeUnit unit) 225: * throws InterruptedException { 226: * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 227: * } 228: * } 229: * </pre> 230: * 231: * <p>Here is a latch class that is like a {@link CountDownLatch} 232: * except that it only requires a single <tt>signal</tt> to 233: * fire. Because a latch is non-exclusive, it uses the <tt>shared</tt> 234: * acquire and release methods. 235: * 236: * <pre> 237: * class BooleanLatch { 238: * 239: * private static class Sync extends AbstractQueuedSynchronizer { 240: * boolean isSignalled() { return getState() != 0; } 241: * 242: * protected int tryAcquireShared(int ignore) { 243: * return isSignalled()? 1 : -1; 244: * } 245: * 246: * protected boolean tryReleaseShared(int ignore) { 247: * setState(1); 248: * return true; 249: * } 250: * } 251: * 252: * private final Sync sync = new Sync(); 253: * public boolean isSignalled() { return sync.isSignalled(); } 254: * public void signal() { sync.releaseShared(1); } 255: * public void await() throws InterruptedException { 256: * sync.acquireSharedInterruptibly(1); 257: * } 258: * } 259: * </pre> 260: * 261: * @since 1.5 262: * @author Doug Lea 263: */ 264: public abstract class AbstractQueuedSynchronizer 265: extends AbstractOwnableSynchronizer 266: implements java.io.Serializable { 267: 268: private static final long serialVersionUID = 7373984972572414691L; 269: 270: /** 271: * Creates a new <tt>AbstractQueuedSynchronizer</tt> instance 272: * with initial synchronization state of zero. 273: */ 274: protected AbstractQueuedSynchronizer() { } 275: 276: /** 277: * Wait queue node class. 278: * 279: * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 280: * Hagersten) lock queue. CLH locks are normally used for 281: * spinlocks. We instead use them for blocking synchronizers, but 282: * use the same basic tactic of holding some of the control 283: * information about a thread in the predecessor of its node. A 284: * "status" field in each node keeps track of whether a thread 285: * should block. A node is signalled when its predecessor 286: * releases. Each node of the queue otherwise serves as a 287: * specific-notification-style monitor holding a single waiting 288: * thread. The status field does NOT control whether threads are 289: * granted locks etc though. A thread may try to acquire if it is 290: * first in the queue. But being first does not guarantee success; 291: * it only gives the right to contend. So the currently released 292: * contender thread may need to rewait. 293: * 294: * <p>To enqueue into a CLH lock, you atomically splice it in as new 295: * tail. To dequeue, you just set the head field. 296: * <pre> 297: * +------+ prev +-----+ +-----+ 298: * head | | <---- | | <---- | | tail 299: * +------+ +-----+ +-----+ 300: * </pre> 301: * 302: * <p>Insertion into a CLH queue requires only a single atomic 303: * operation on "tail", so there is a simple atomic point of 304: * demarcation from unqueued to queued. Similarly, dequeing 305: * involves only updating the "head". However, it takes a bit 306: * more work for nodes to determine who their successors are, 307: * in part to deal with possible cancellation due to timeouts 308: * and interrupts. 309: * 310: * <p>The "prev" links (not used in original CLH locks), are mainly 311: * needed to handle cancellation. If a node is cancelled, its 312: * successor is (normally) relinked to a non-cancelled 313: * predecessor. For explanation of similar mechanics in the case 314: * of spin locks, see the papers by Scott and Scherer at 315: * http://www.cs.rochester.edu/u/scott/synchronization/ 316: * 317: * <p>We also use "next" links to implement blocking mechanics. 318: * The thread id for each node is kept in its own node, so a 319: * predecessor signals the next node to wake up by traversing 320: * next link to determine which thread it is. Determination of 321: * successor must avoid races with newly queued nodes to set 322: * the "next" fields of their predecessors. This is solved 323: * when necessary by checking backwards from the atomically 324: * updated "tail" when a node's successor appears to be null. 325: * (Or, said differently, the next-links are an optimization 326: * so that we don't usually need a backward scan.) 327: * 328: * <p>Cancellation introduces some conservatism to the basic 329: * algorithms. Since we must poll for cancellation of other 330: * nodes, we can miss noticing whether a cancelled node is 331: * ahead or behind us. This is dealt with by always unparking 332: * successors upon cancellation, allowing them to stabilize on 333: * a new predecessor. 334: * 335: * <p>CLH queues need a dummy header node to get started. But 336: * we don't create them on construction, because it would be wasted 337: * effort if there is never contention. Instead, the node 338: * is constructed and head and tail pointers are set upon first 339: * contention. 340: * 341: * <p>Threads waiting on Conditions use the same nodes, but 342: * use an additional link. Conditions only need to link nodes 343: * in simple (non-concurrent) linked queues because they are 344: * only accessed when exclusively held. Upon await, a node is 345: * inserted into a condition queue. Upon signal, the node is 346: * transferred to the main queue. A special value of status 347: * field is used to mark which queue a node is on. 348: * 349: * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 350: * Scherer and Michael Scott, along with members of JSR-166 351: * expert group, for helpful ideas, discussions, and critiques 352: * on the design of this class. 353: */ 354: static final class Node { 355: /** waitStatus value to indicate thread has cancelled */ 356: static final int CANCELLED = 1; 357: /** waitStatus value to indicate successor's thread needs unparking */ 358: static final int SIGNAL = -1; 359: /** waitStatus value to indicate thread is waiting on condition */ 360: static final int CONDITION = -2; 361: /** Marker to indicate a node is waiting in shared mode */ 362: static final Node SHARED = new Node(); 363: /** Marker to indicate a node is waiting in exclusive mode */ 364: static final Node EXCLUSIVE = null; 365: 366: /** 367: * Status field, taking on only the values: 368: * SIGNAL: The successor of this node is (or will soon be) 369: * blocked (via park), so the current node must 370: * unpark its successor when it releases or 371: * cancels. To avoid races, acquire methods must 372: * first indicate they need a signal, 373: * then retry the atomic acquire, and then, 374: * on failure, block. 375: * CANCELLED: This node is cancelled due to timeout or interrupt. 376: * Nodes never leave this state. In particular, 377: * a thread with cancelled node never again blocks. 378: * CONDITION: This node is currently on a condition queue. 379: * It will not be used as a sync queue node until 380: * transferred. (Use of this value here 381: * has nothing to do with the other uses 382: * of the field, but simplifies mechanics.) 383: * 0: None of the above 384: * 385: * The values are arranged numerically to simplify use. 386: * Non-negative values mean that a node doesn't need to 387: * signal. So, most code doesn't need to check for particular 388: * values, just for sign. 389: * 390: * The field is initialized to 0 for normal sync nodes, and 391: * CONDITION for condition nodes. It is modified only using 392: * CAS. 393: */ 394: volatile int waitStatus; 395: 396: /** 397: * Link to predecessor node that current node/thread relies on 398: * for checking waitStatus. Assigned during enqueing, and nulled 399: * out (for sake of GC) only upon dequeuing. Also, upon 400: * cancellation of a predecessor, we short-circuit while 401: * finding a non-cancelled one, which will always exist 402: * because the head node is never cancelled: A node becomes 403: * head only as a result of successful acquire. A 404: * cancelled thread never succeeds in acquiring, and a thread only 405: * cancels itself, not any other node. 406: */ 407: volatile Node prev; 408: 409: /** 410: * Link to the successor node that the current node/thread 411: * unparks upon release. Assigned once during enqueuing, and 412: * nulled out (for sake of GC) when no longer needed. Upon 413: * cancellation, we cannot adjust this field, but can notice 414: * status and bypass the node if cancelled. The enq operation 415: * does not assign next field of a predecessor until after 416: * attachment, so seeing a null next field does not 417: * necessarily mean that node is at end of queue. However, if 418: * a next field appears to be null, we can scan prev's from 419: * the tail to double-check. 420: */ 421: volatile Node next; 422: 423: /** 424: * The thread that enqueued this node. Initialized on 425: * construction and nulled out after use. 426: */ 427: volatile Thread thread; 428: 429: /** 430: * Link to next node waiting on condition, or the special 431: * value SHARED. Because condition queues are accessed only 432: * when holding in exclusive mode, we just need a simple 433: * linked queue to hold nodes while they are waiting on 434: * conditions. They are then transferred to the queue to 435: * re-acquire. And because conditions can only be exclusive, 436: * we save a field by using special value to indicate shared 437: * mode. 438: */ 439: Node nextWaiter; 440: 441: /** 442: * Returns true if node is waiting in shared mode 443: */ 444: final boolean isShared() { 445: return nextWaiter == SHARED; 446: } 447: 448: /** 449: * Returns previous node, or throws NullPointerException if 450: * null. Use when predecessor cannot be null. 451: * @return the predecessor of this node 452: */ 453: final Node predecessor() throws NullPointerException { 454: Node p = prev; 455: if (p == null) 456: throw new NullPointerException(); 457: else 458: return p; 459: } 460: 461: Node() { // Used to establish initial head or SHARED marker 462: } 463: 464: Node(Thread thread, Node mode) { // Used by addWaiter 465: this.nextWaiter = mode; 466: this.thread = thread; 467: } 468: 469: Node(Thread thread, int waitStatus) { // Used by Condition 470: this.waitStatus = waitStatus; 471: this.thread = thread; 472: } 473: } 474: 475: /** 476: * Head of the wait queue, lazily initialized. Except for 477: * initialization, it is modified only via method setHead. Note: 478: * If head exists, its waitStatus is guaranteed not to be 479: * CANCELLED. 480: */ 481: private transient volatile Node head; 482: 483: /** 484: * Tail of the wait queue, lazily initialized. Modified only via 485: * method enq to add new wait node. 486: */ 487: private transient volatile Node tail; 488: 489: /** 490: * The synchronization state. 491: */ 492: private volatile int state; 493: 494: /** 495: * Returns the current value of synchronization state. 496: * This operation has memory semantics of a <tt>volatile</tt> read. 497: * @return current state value 498: */ 499: protected final int getState() { 500: return state; 501: } 502: 503: /** 504: * Sets the value of synchronization state. 505: * This operation has memory semantics of a <tt>volatile</tt> write. 506: * @param newState the new state value 507: */ 508: protected final void setState(int newState) { 509: state = newState; 510: } 511: 512: /** 513: * Atomically sets synchronization state to the given updated 514: * value if the current state value equals the expected value. 515: * This operation has memory semantics of a <tt>volatile</tt> read 516: * and write. 517: * 518: * @param expect the expected value 519: * @param update the new value 520: * @return true if successful. False return indicates that the actual 521: * value was not equal to the expected value. 522: */ 523: protected final boolean compareAndSetState(int expect, int update) { 524: // See below for intrinsics setup to support this 525: return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 526: } 527: 528: // Queuing utilities 529: 530: /** 531: * The number of nanoseconds for which it is faster to spin 532: * rather than to use timed park. A rough estimate suffices 533: * to improve responsiveness with very short timeouts. 534: */ 535: static final long spinForTimeoutThreshold = 1000L; 536: 537: /** 538: * Inserts node into queue, initializing if necessary. See picture above. 539: * @param node the node to insert 540: * @return node's predecessor 541: */ 542: private Node enq(final Node node) { 543: for (;;) { 544: Node t = tail; 545: if (t == null) { // Must initialize 546: Node h = new Node(); // Dummy header 547: h.next = node; 548: node.prev = h; 549: if (compareAndSetHead(h)) { 550: tail = node; 551: return h; 552: } 553: } 554: else { 555: node.prev = t; 556: if (compareAndSetTail(t, node)) { 557: t.next = node; 558: return t; 559: } 560: } 561: } 562: } 563: 564: /** 565: * Creates and enqueues node for given thread and mode. 566: * 567: * @param current the thread 568: * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 569: * @return the new node 570: */ 571: private Node addWaiter(Node mode) { 572: Node node = new Node(Thread.currentThread(), mode); 573: // Try the fast path of enq; backup to full enq on failure 574: Node pred = tail; 575: if (pred != null) { 576: node.prev = pred; 577: if (compareAndSetTail(pred, node)) { 578: pred.next = node; 579: return node; 580: } 581: } 582: enq(node); 583: return node; 584: } 585: 586: /** 587: * Sets head of queue to be node, thus dequeuing. Called only by 588: * acquire methods. Also nulls out unused fields for sake of GC 589: * and to suppress unnecessary signals and traversals. 590: * 591: * @param node the node 592: */ 593: private void setHead(Node node) { 594: head = node; 595: node.thread = null; 596: node.prev = null; 597: } 598: 599: /** 600: * Wakes up node's successor, if one exists. 601: * 602: * @param node the node 603: */ 604: private void unparkSuccessor(Node node) { 605: /* 606: * Try to clear status in anticipation of signalling. It is 607: * OK if this fails or if status is changed by waiting thread. 608: */ 609: compareAndSetWaitStatus(node, Node.SIGNAL, 0); 610: 611: /* 612: * Thread to unpark is held in successor, which is normally 613: * just the next node. But if cancelled or apparently null, 614: * traverse backwards from tail to find the actual 615: * non-cancelled successor. 616: */ 617: Node s = node.next; 618: if (s == null || s.waitStatus > 0) { 619: s = null; 620: for (Node t = tail; t != null && t != node; t = t.prev) 621: if (t.waitStatus <= 0) 622: s = t; 623: } 624: if (s != null) 625: LockSupport.unpark(s.thread); 626: } 627: 628: /** 629: * Sets head of queue, and checks if successor may be waiting 630: * in shared mode, if so propagating if propagate > 0. 631: * 632: * @param pred the node holding waitStatus for node 633: * @param node the node 634: * @param propagate the return value from a tryAcquireShared 635: */ 636: private void setHeadAndPropagate(Node node, int propagate) { 637: setHead(node); 638: if (propagate > 0 && node.waitStatus != 0) { 639: /* 640: * Don't bother fully figuring out successor. If it 641: * looks null, call unparkSuccessor anyway to be safe. 642: */ 643: Node s = node.next; 644: if (s == null || s.isShared()) 645: unparkSuccessor(node); 646: } 647: } 648: 649: // Utilities for various versions of acquire 650: 651: /** 652: * Cancels an ongoing attempt to acquire. 653: * 654: * @param node the node 655: */ 656: private void cancelAcquire(Node node) { 657: if (node != null) { // Ignore if node doesn't exist 658: node.thread = null; 659: // Can use unconditional write instead of CAS here 660: node.waitStatus = Node.CANCELLED; 661: unparkSuccessor(node); 662: } 663: } 664: 665: /** 666: * Checks and updates status for a node that failed to acquire. 667: * Returns true if thread should block. This is the main signal 668: * control in all acquire loops. Requires that pred == node.prev 669: * 670: * @param pred node's predecessor holding status 671: * @param node the node 672: * @return {@code true} if thread should block 673: */ 674: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 675: int s = pred.waitStatus; 676: if (s < 0) 677: /* 678: * This node has already set status asking a release 679: * to signal it, so it can safely park 680: */ 681: return true; 682: if (s > 0) 683: /* 684: * Predecessor was cancelled. Move up to its predecessor 685: * and indicate retry. 686: */ 687: node.prev = pred.prev; 688: else 689: /* 690: * Indicate that we need a signal, but don't park yet. Caller 691: * will need to retry to make sure it cannot acquire before 692: * parking. 693: */ 694: compareAndSetWaitStatus(pred, 0, Node.SIGNAL); 695: return false; 696: } 697: 698: /** 699: * Convenience method to interrupt current thread. 700: */ 701: private static void selfInterrupt() { 702: Thread.currentThread().interrupt(); 703: } 704: 705: /** 706: * Convenience method to park and then check if interrupted 707: * 708: * @return {@code true} if interrupted 709: */ 710: private final boolean parkAndCheckInterrupt() { 711: LockSupport.park(this); 712: return Thread.interrupted(); 713: } 714: 715: /* 716: * Various flavors of acquire, varying in exclusive/shared and 717: * control modes. Each is mostly the same, but annoyingly 718: * different. Only a little bit of factoring is possible due to 719: * interactions of exception mechanics (including ensuring that we 720: * cancel if tryAcquire throws exception) and other control, at 721: * least not without hurting performance too much. 722: */ 723: 724: /** 725: * Acquires in exclusive uninterruptible mode for thread already in 726: * queue. Used by condition wait methods as well as acquire. 727: * 728: * @param node the node 729: * @param arg the acquire argument 730: * @return {@code true} if interrupted while waiting 731: */ 732: final boolean acquireQueued(final Node node, int arg) { 733: try { 734: boolean interrupted = false; 735: for (;;) { 736: final Node p = node.predecessor(); 737: if (p == head && tryAcquire(arg)) { 738: setHead(node); 739: p.next = null; // help GC 740: return interrupted; 741: } 742: if (shouldParkAfterFailedAcquire(p, node) && 743: parkAndCheckInterrupt()) 744: interrupted = true; 745: } 746: } catch (RuntimeException ex) { 747: cancelAcquire(node); 748: throw ex; 749: } 750: } 751: 752: /** 753: * Acquires in exclusive interruptible mode. 754: * @param arg the acquire argument 755: */ 756: private void doAcquireInterruptibly(int arg) 757: throws InterruptedException { 758: final Node node = addWaiter(Node.EXCLUSIVE); 759: try { 760: for (;;) { 761: final Node p = node.predecessor(); 762: if (p == head && tryAcquire(arg)) { 763: setHead(node); 764: p.next = null; // help GC 765: return; 766: } 767: if (shouldParkAfterFailedAcquire(p, node) && 768: parkAndCheckInterrupt()) 769: break; 770: } 771: } catch (RuntimeException ex) { 772: cancelAcquire(node); 773: throw ex; 774: } 775: // Arrive here only if interrupted 776: cancelAcquire(node); 777: throw new InterruptedException(); 778: } 779: 780: /** 781: * Acquires in exclusive timed mode. 782: * 783: * @param arg the acquire argument 784: * @param nanosTimeout max wait time 785: * @return {@code true} if acquired 786: */ 787: private boolean doAcquireNanos(int arg, long nanosTimeout) 788: throws InterruptedException { 789: long lastTime = System.nanoTime(); 790: final Node node = addWaiter(Node.EXCLUSIVE); 791: try { 792: for (;;) { 793: final Node p = node.predecessor(); 794: if (p == head && tryAcquire(arg)) { 795: setHead(node); 796: p.next = null; // help GC 797: return true; 798: } 799: if (nanosTimeout <= 0) { 800: cancelAcquire(node); 801: return false; 802: } 803: if (nanosTimeout > spinForTimeoutThreshold && 804: shouldParkAfterFailedAcquire(p, node)) 805: LockSupport.parkNanos(this, nanosTimeout); 806: long now = System.nanoTime(); 807: nanosTimeout -= now - lastTime; 808: lastTime = now; 809: if (Thread.interrupted()) 810: break; 811: } 812: } catch (RuntimeException ex) { 813: cancelAcquire(node); 814: throw ex; 815: } 816: // Arrive here only if interrupted 817: cancelAcquire(node); 818: throw new InterruptedException(); 819: } 820: 821: /** 822: * Acquires in shared uninterruptible mode. 823: * @param arg the acquire argument 824: */ 825: private void doAcquireShared(int arg) { 826: final Node node = addWaiter(Node.SHARED); 827: try { 828: boolean interrupted = false; 829: for (;;) { 830: final Node p = node.predecessor(); 831: if (p == head) { 832: int r = tryAcquireShared(arg); 833: if (r >= 0) { 834: setHeadAndPropagate(node, r); 835: p.next = null; // help GC 836: if (interrupted) 837: selfInterrupt(); 838: return; 839: } 840: } 841: if (shouldParkAfterFailedAcquire(p, node) && 842: parkAndCheckInterrupt()) 843: interrupted = true; 844: } 845: } catch (RuntimeException ex) { 846: cancelAcquire(node); 847: throw ex; 848: } 849: } 850: 851: /** 852: * Acquires in shared interruptible mode. 853: * @param arg the acquire argument 854: */ 855: private void doAcquireSharedInterruptibly(int arg) 856: throws InterruptedException { 857: final Node node = addWaiter(Node.SHARED); 858: try { 859: for (;;) { 860: final Node p = node.predecessor(); 861: if (p == head) { 862: int r = tryAcquireShared(arg); 863: if (r >= 0) { 864: setHeadAndPropagate(node, r); 865: p.next = null; // help GC 866: return; 867: } 868: } 869: if (shouldParkAfterFailedAcquire(p, node) && 870: parkAndCheckInterrupt()) 871: break; 872: } 873: } catch (RuntimeException ex) { 874: cancelAcquire(node); 875: throw ex; 876: } 877: // Arrive here only if interrupted 878: cancelAcquire(node); 879: throw new InterruptedException(); 880: } 881: 882: /** 883: * Acquires in shared timed mode. 884: * 885: * @param arg the acquire argument 886: * @param nanosTimeout max wait time 887: * @return {@code true} if acquired 888: */ 889: private boolean doAcquireSharedNanos(int arg, long nanosTimeout) 890: throws InterruptedException { 891: 892: long lastTime = System.nanoTime(); 893: final Node node = addWaiter(Node.SHARED); 894: try { 895: for (;;) { 896: final Node p = node.predecessor(); 897: if (p == head) { 898: int r = tryAcquireShared(arg); 899: if (r >= 0) { 900: setHeadAndPropagate(node, r); 901: p.next = null; // help GC 902: return true; 903: } 904: } 905: if (nanosTimeout <= 0) { 906: cancelAcquire(node); 907: return false; 908: } 909: if (nanosTimeout > spinForTimeoutThreshold && 910: shouldParkAfterFailedAcquire(p, node)) 911: LockSupport.parkNanos(this, nanosTimeout); 912: long now = System.nanoTime(); 913: nanosTimeout -= now - lastTime; 914: lastTime = now; 915: if (Thread.interrupted()) 916: break; 917: } 918: } catch (RuntimeException ex) { 919: cancelAcquire(node); 920: throw ex; 921: } 922: // Arrive here only if interrupted 923: cancelAcquire(node); 924: throw new InterruptedException(); 925: } 926: 927: // Main exported methods 928: 929: /** 930: * Attempts to acquire in exclusive mode. This method should query 931: * if the state of the object permits it to be acquired in the 932: * exclusive mode, and if so to acquire it. 933: * 934: * <p>This method is always invoked by the thread performing 935: * acquire. If this method reports failure, the acquire method 936: * may queue the thread, if it is not already queued, until it is 937: * signalled by a release from some other thread. This can be used 938: * to implement method {@link Lock#tryLock()}. 939: * 940: * <p>The default 941: * implementation throws {@link UnsupportedOperationException}. 942: * 943: * @param arg the acquire argument. This value is always the one 944: * passed to an acquire method, or is the value saved on entry 945: * to a condition wait. The value is otherwise uninterpreted 946: * and can represent anything you like. 947: * @return {@code true} if successful. Upon success, this object has 948: * been acquired. 949: * @throws IllegalMonitorStateException if acquiring would place this 950: * synchronizer in an illegal state. This exception must be 951: * thrown in a consistent fashion for synchronization to work 952: * correctly. 953: * @throws UnsupportedOperationException if exclusive mode is not supported 954: */ 955: protected boolean tryAcquire(int arg) { 956: throw new UnsupportedOperationException(); 957: } 958: 959: /** 960: * Attempts to set the state to reflect a release in exclusive 961: * mode. 962: * 963: * <p>This method is always invoked by the thread performing release. 964: * 965: * <p>The default implementation throws 966: * {@link UnsupportedOperationException}. 967: * 968: * @param arg the release argument. This value is always the one 969: * passed to a release method, or the current state value upon 970: * entry to a condition wait. The value is otherwise 971: * uninterpreted and can represent anything you like. 972: * @return {@code true} if this object is now in a fully released 973: * state, so that any waiting threads may attempt to acquire; 974: * and {@code false} otherwise. 975: * @throws IllegalMonitorStateException if releasing would place this 976: * synchronizer in an illegal state. This exception must be 977: * thrown in a consistent fashion for synchronization to work 978: * correctly. 979: * @throws UnsupportedOperationException if exclusive mode is not supported 980: */ 981: protected boolean tryRelease(int arg) { 982: throw new UnsupportedOperationException(); 983: } 984: 985: /** 986: * Attempts to acquire in shared mode. This method should query if 987: * the state of the object permits it to be acquired in the shared 988: * mode, and if so to acquire it. 989: * 990: * <p>This method is always invoked by the thread performing 991: * acquire. If this method reports failure, the acquire method 992: * may queue the thread, if it is not already queued, until it is 993: * signalled by a release from some other thread. 994: * 995: * <p>The default implementation throws {@link 996: * UnsupportedOperationException}. 997: * 998: * @param arg the acquire argument. This value is always the one 999: * passed to an acquire method, or is the value saved on entry 1000: * to a condition wait. The value is otherwise uninterpreted 1001: * and can represent anything you like. 1002: * @return a negative value on failure; zero if acquisition in shared 1003: * mode succeeded but no subsequent shared-mode acquire can 1004: * succeed; and a positive value if acquisition in shared 1005: * mode succeeded and subsequent shared-mode acquires might 1006: * also succeed, in which case a subsequent waiting thread 1007: * must check availability. (Support for three different 1008: * return values enables this method to be used in contexts 1009: * where acquires only sometimes act exclusively.) Upon 1010: * success, this object has been acquired. 1011: * @throws IllegalMonitorStateException if acquiring would place this 1012: * synchronizer in an illegal state. This exception must be 1013: * thrown in a consistent fashion for synchronization to work 1014: * correctly. 1015: * @throws UnsupportedOperationException if shared mode is not supported 1016: */ 1017: protected int tryAcquireShared(int arg) { 1018: throw new UnsupportedOperationException(); 1019: } 1020: 1021: /** 1022: * Attempts to set the state to reflect a release in shared mode. 1023: * 1024: * <p>This method is always invoked by the thread performing release. 1025: * 1026: * <p>The default implementation throws 1027: * {@link UnsupportedOperationException}. 1028: * 1029: * @param arg the release argument. This value is always the one 1030: * passed to a release method, or the current state value upon 1031: * entry to a condition wait. The value is otherwise 1032: * uninterpreted and can represent anything you like. 1033: * @return {@code true} if this release of shared mode may permit a 1034: * waiting acquire (shared or exclusive) to succeed; and 1035: * {@code false} otherwise 1036: * @throws IllegalMonitorStateException if releasing would place this 1037: * synchronizer in an illegal state. This exception must be 1038: * thrown in a consistent fashion for synchronization to work 1039: * correctly. 1040: * @throws UnsupportedOperationException if shared mode is not supported 1041: */ 1042: protected boolean tryReleaseShared(int arg) { 1043: throw new UnsupportedOperationException(); 1044: } 1045: 1046: /** 1047: * Returns {@code true} if synchronization is held exclusively with 1048: * respect to the current (calling) thread. This method is invoked 1049: * upon each call to a non-waiting {@link ConditionObject} method. 1050: * (Waiting methods instead invoke {@link #release}.) 1051: * 1052: * <p>The default implementation throws {@link 1053: * UnsupportedOperationException}. This method is invoked 1054: * internally only within {@link ConditionObject} methods, so need 1055: * not be defined if conditions are not used. 1056: * 1057: * @return {@code true} if synchronization is held exclusively; 1058: * {@code false} otherwise 1059: * @throws UnsupportedOperationException if conditions are not supported 1060: */ 1061: protected boolean isHeldExclusively() { 1062: throw new UnsupportedOperationException(); 1063: } 1064: 1065: /** 1066: * Acquires in exclusive mode, ignoring interrupts. Implemented 1067: * by invoking at least once {@link #tryAcquire}, 1068: * returning on success. Otherwise the thread is queued, possibly 1069: * repeatedly blocking and unblocking, invoking {@link 1070: * #tryAcquire} until success. This method can be used 1071: * to implement method {@link Lock#lock}. 1072: * 1073: * @param arg the acquire argument. This value is conveyed to 1074: * {@link #tryAcquire} but is otherwise uninterpreted and 1075: * can represent anything you like. 1076: */ 1077: public final void acquire(int arg) { 1078: if (!tryAcquire(arg) && 1079: acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 1080: selfInterrupt(); 1081: } 1082: 1083: /** 1084: * Acquires in exclusive mode, aborting if interrupted. 1085: * Implemented by first checking interrupt status, then invoking 1086: * at least once {@link #tryAcquire}, returning on 1087: * success. Otherwise the thread is queued, possibly repeatedly 1088: * blocking and unblocking, invoking {@link #tryAcquire} 1089: * until success or the thread is interrupted. This method can be 1090: * used to implement method {@link Lock#lockInterruptibly}. 1091: * 1092: * @param arg the acquire argument. This value is conveyed to 1093: * {@link #tryAcquire} but is otherwise uninterpreted and 1094: * can represent anything you like. 1095: * @throws InterruptedException if the current thread is interrupted 1096: */ 1097: public final void acquireInterruptibly(int arg) throws InterruptedException { 1098: if (Thread.interrupted()) 1099: throw new InterruptedException(); 1100: if (!tryAcquire(arg)) 1101: doAcquireInterruptibly(arg); 1102: } 1103: 1104: /** 1105: * Attempts to acquire in exclusive mode, aborting if interrupted, 1106: * and failing if the given timeout elapses. Implemented by first 1107: * checking interrupt status, then invoking at least once {@link 1108: * #tryAcquire}, returning on success. Otherwise, the thread is 1109: * queued, possibly repeatedly blocking and unblocking, invoking 1110: * {@link #tryAcquire} until success or the thread is interrupted 1111: * or the timeout elapses. This method can be used to implement 1112: * method {@link Lock#tryLock(long, TimeUnit)}. 1113: * 1114: * @param arg the acquire argument. This value is conveyed to 1115: * {@link #tryAcquire} but is otherwise uninterpreted and 1116: * can represent anything you like. 1117: * @param nanosTimeout the maximum number of nanoseconds to wait 1118: * @return {@code true} if acquired; {@code false} if timed out 1119: * @throws InterruptedException if the current thread is interrupted 1120: */ 1121: public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { 1122: if (Thread.interrupted()) 1123: throw new InterruptedException(); 1124: return tryAcquire(arg) || 1125: doAcquireNanos(arg, nanosTimeout); 1126: } 1127: 1128: /** 1129: * Releases in exclusive mode. Implemented by unblocking one or 1130: * more threads if {@link #tryRelease} returns true. 1131: * This method can be used to implement method {@link Lock#unlock}. 1132: * 1133: * @param arg the release argument. This value is conveyed to 1134: * {@link #tryRelease} but is otherwise uninterpreted and 1135: * can represent anything you like. 1136: * @return the value returned from {@link #tryRelease} 1137: */ 1138: public final boolean release(int arg) { 1139: if (tryRelease(arg)) { 1140: Node h = head; 1141: if (h != null && h.waitStatus != 0) 1142: unparkSuccessor(h); 1143: return true; 1144: } 1145: return false; 1146: } 1147: 1148: /** 1149: * Acquires in shared mode, ignoring interrupts. Implemented by 1150: * first invoking at least once {@link #tryAcquireShared}, 1151: * returning on success. Otherwise the thread is queued, possibly 1152: * repeatedly blocking and unblocking, invoking {@link 1153: * #tryAcquireShared} until success. 1154: * 1155: * @param arg the acquire argument. This value is conveyed to 1156: * {@link #tryAcquireShared} but is otherwise uninterpreted 1157: * and can represent anything you like. 1158: */ 1159: public final void acquireShared(int arg) { 1160: if (tryAcquireShared(arg) < 0) 1161: doAcquireShared(arg); 1162: } 1163: 1164: /** 1165: * Acquires in shared mode, aborting if interrupted. Implemented 1166: * by first checking interrupt status, then invoking at least once 1167: * {@link #tryAcquireShared}, returning on success. Otherwise the 1168: * thread is queued, possibly repeatedly blocking and unblocking, 1169: * invoking {@link #tryAcquireShared} until success or the thread 1170: * is interrupted. 1171: * @param arg the acquire argument. 1172: * This value is conveyed to {@link #tryAcquireShared} but is 1173: * otherwise uninterpreted and can represent anything 1174: * you like. 1175: * @throws InterruptedException if the current thread is interrupted 1176: */ 1177: public final void acquireSharedInterruptibly(int arg) throws InterruptedException { 1178: if (Thread.interrupted()) 1179: throw new InterruptedException(); 1180: if (tryAcquireShared(arg) < 0) 1181: doAcquireSharedInterruptibly(arg); 1182: } 1183: 1184: /** 1185: * Attempts to acquire in shared mode, aborting if interrupted, and 1186: * failing if the given timeout elapses. Implemented by first 1187: * checking interrupt status, then invoking at least once {@link 1188: * #tryAcquireShared}, returning on success. Otherwise, the 1189: * thread is queued, possibly repeatedly blocking and unblocking, 1190: * invoking {@link #tryAcquireShared} until success or the thread 1191: * is interrupted or the timeout elapses. 1192: * 1193: * @param arg the acquire argument. This value is conveyed to 1194: * {@link #tryAcquireShared} but is otherwise uninterpreted 1195: * and can represent anything you like. 1196: * @param nanosTimeout the maximum number of nanoseconds to wait 1197: * @return {@code true} if acquired; {@code false} if timed out 1198: * @throws InterruptedException if the current thread is interrupted 1199: */ 1200: public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { 1201: if (Thread.interrupted()) 1202: throw new InterruptedException(); 1203: return tryAcquireShared(arg) >= 0 || 1204: doAcquireSharedNanos(arg, nanosTimeout); 1205: } 1206: 1207: /** 1208: * Releases in shared mode. Implemented by unblocking one or more 1209: * threads if {@link #tryReleaseShared} returns true. 1210: * 1211: * @param arg the release argument. This value is conveyed to 1212: * {@link #tryReleaseShared} but is otherwise uninterpreted 1213: * and can represent anything you like. 1214: * @return the value returned from {@link #tryReleaseShared} 1215: */ 1216: public final boolean releaseShared(int arg) { 1217: if (tryReleaseShared(arg)) { 1218: Node h = head; 1219: if (h != null && h.waitStatus != 0) 1220: unparkSuccessor(h); 1221: return true; 1222: } 1223: return false; 1224: } 1225: 1226: // Queue inspection methods 1227: 1228: /** 1229: * Queries whether any threads are waiting to acquire. Note that 1230: * because cancellations due to interrupts and timeouts may occur 1231: * at any time, a {@code true} return does not guarantee that any 1232: * other thread will ever acquire. 1233: * 1234: * <p>In this implementation, this operation returns in 1235: * constant time. 1236: * 1237: * @return {@code true} if there may be other threads waiting to acquire 1238: */ 1239: public final boolean hasQueuedThreads() { 1240: return head != tail; 1241: } 1242: 1243: /** 1244: * Queries whether any threads have ever contended to acquire this 1245: * synchronizer; that is if an acquire method has ever blocked. 1246: * 1247: * <p>In this implementation, this operation returns in 1248: * constant time. 1249: * 1250: * @return {@code true} if there has ever been contention 1251: */ 1252: public final boolean hasContended() { 1253: return head != null; 1254: } 1255: 1256: /** 1257: * Returns the first (longest-waiting) thread in the queue, or 1258: * {@code null} if no threads are currently queued. 1259: * 1260: * <p>In this implementation, this operation normally returns in 1261: * constant time, but may iterate upon contention if other threads are 1262: * concurrently modifying the queue. 1263: * 1264: * @return the first (longest-waiting) thread in the queue, or 1265: * {@code null} if no threads are currently queued 1266: */ 1267: public final Thread getFirstQueuedThread() { 1268: // handle only fast path, else relay 1269: return (head == tail)? null : fullGetFirstQueuedThread(); 1270: } 1271: 1272: /** 1273: * Version of getFirstQueuedThread called when fastpath fails 1274: */ 1275: private Thread fullGetFirstQueuedThread() { 1276: /* 1277: * The first node is normally h.next. Try to get its 1278: * thread field, ensuring consistent reads: If thread 1279: * field is nulled out or s.prev is no longer head, then 1280: * some other thread(s) concurrently performed setHead in 1281: * between some of our reads. We try this twice before 1282: * resorting to traversal. 1283: */ 1284: Node h, s; 1285: Thread st; 1286: if (((h = head) != null && (s = h.next) != null && 1287: s.prev == head && (st = s.thread) != null) || 1288: ((h = head) != null && (s = h.next) != null && 1289: s.prev == head && (st = s.thread) != null)) 1290: return st; 1291: 1292: /* 1293: * Head's next field might not have been set yet, or may have 1294: * been unset after setHead. So we must check to see if tail 1295: * is actually first node. If not, we continue on, safely 1296: * traversing from tail back to head to find first, 1297: * guaranteeing termination. 1298: */ 1299: 1300: Node t = tail; 1301: Thread firstThread = null; 1302: while (t != null && t != head) { 1303: Thread tt = t.thread; 1304: if (tt != null) 1305: firstThread = tt; 1306: t = t.prev; 1307: } 1308: return firstThread; 1309: } 1310: 1311: /** 1312: * Returns true if the given thread is currently queued. 1313: * 1314: * <p>This implementation traverses the queue to determine 1315: * presence of the given thread. 1316: * 1317: * @param thread the thread 1318: * @return {@code true} if the given thread is on the queue 1319: * @throws NullPointerException if the thread is null 1320: */ 1321: public final boolean isQueued(Thread thread) { 1322: if (thread == null) 1323: throw new NullPointerException(); 1324: for (Node p = tail; p != null; p = p.prev) 1325: if (p.thread == thread) 1326: return true; 1327: return false; 1328: } 1329: 1330: /** 1331: * Return {@code true} if the apparent first queued thread, if one 1332: * exists, is not waiting in exclusive mode. Used only as a heuristic 1333: * in ReentrantReadWriteLock. 1334: */ 1335: final boolean apparentlyFirstQueuedIsExclusive() { 1336: Node h, s; 1337: return ((h = head) != null && (s = h.next) != null && 1338: s.nextWaiter != Node.SHARED); 1339: } 1340: 1341: /** 1342: * Return {@code true} if the queue is empty or if the given thread 1343: * is at the head of the queue. This is reliable only if 1344: * <tt>current</tt> is actually Thread.currentThread() of caller. 1345: */ 1346: final boolean isFirst(Thread current) { 1347: Node h, s; 1348: return ((h = head) == null || 1349: ((s = h.next) != null && s.thread == current) || 1350: fullIsFirst(current)); 1351: } 1352: 1353: final boolean fullIsFirst(Thread current) { 1354: // same idea as fullGetFirstQueuedThread 1355: Node h, s; 1356: Thread firstThread = null; 1357: if (((h = head) != null && (s = h.next) != null && 1358: s.prev == head && (firstThread = s.thread) != null)) 1359: return firstThread == current; 1360: Node t = tail; 1361: while (t != null && t != head) { 1362: Thread tt = t.thread; 1363: if (tt != null) 1364: firstThread = tt; 1365: t = t.prev; 1366: } 1367: return firstThread == current || firstThread == null; 1368: } 1369: 1370: 1371: // Instrumentation and monitoring methods 1372: 1373: /** 1374: * Returns an estimate of the number of threads waiting to 1375: * acquire. The value is only an estimate because the number of 1376: * threads may change dynamically while this method traverses 1377: * internal data structures. This method is designed for use in 1378: * monitoring system state, not for synchronization 1379: * control. 1380: * 1381: * @return the estimated number of threads waiting to acquire 1382: */ 1383: public final int getQueueLength() { 1384: int n = 0; 1385: for (Node p = tail; p != null; p = p.prev) { 1386: if (p.thread != null) 1387: ++n; 1388: } 1389: return n; 1390: } 1391: 1392: /** 1393: * Returns a collection containing threads that may be waiting to 1394: * acquire. Because the actual set of threads may change 1395: * dynamically while constructing this result, the returned 1396: * collection is only a best-effort estimate. The elements of the 1397: * returned collection are in no particular order. This method is 1398: * designed to facilitate construction of subclasses that provide 1399: * more extensive monitoring facilities. 1400: * 1401: * @return the collection of threads 1402: */ 1403: public final Collection<Thread> getQueuedThreads() { 1404: ArrayList<Thread> list = new ArrayList<Thread>(); 1405: for (Node p = tail; p != null; p = p.prev) { 1406: Thread t = p.thread; 1407: if (t != null) 1408: list.add(t); 1409: } 1410: return list; 1411: } 1412: 1413: /** 1414: * Returns a collection containing threads that may be waiting to 1415: * acquire in exclusive mode. This has the same properties 1416: * as {@link #getQueuedThreads} except that it only returns 1417: * those threads waiting due to an exclusive acquire. 1418: * 1419: * @return the collection of threads 1420: */ 1421: public final Collection<Thread> getExclusiveQueuedThreads() { 1422: ArrayList<Thread> list = new ArrayList<Thread>(); 1423: for (Node p = tail; p != null; p = p.prev) { 1424: if (!p.isShared()) { 1425: Thread t = p.thread; 1426: if (t != null) 1427: list.add(t); 1428: } 1429: } 1430: return list; 1431: } 1432: 1433: /** 1434: * Returns a collection containing threads that may be waiting to 1435: * acquire in shared mode. This has the same properties 1436: * as {@link #getQueuedThreads} except that it only returns 1437: * those threads waiting due to a shared acquire. 1438: * 1439: * @return the collection of threads 1440: */ 1441: public final Collection<Thread> getSharedQueuedThreads() { 1442: ArrayList<Thread> list = new ArrayList<Thread>(); 1443: for (Node p = tail; p != null; p = p.prev) { 1444: if (p.isShared()) { 1445: Thread t = p.thread; 1446: if (t != null) 1447: list.add(t); 1448: } 1449: } 1450: return list; 1451: } 1452: 1453: /** 1454: * Returns a string identifying this synchronizer, as well as its state. 1455: * The state, in brackets, includes the String {@code "State ="} 1456: * followed by the current value of {@link #getState}, and either 1457: * {@code "nonempty"} or {@code "empty"} depending on whether the 1458: * queue is empty. 1459: * 1460: * @return a string identifying this synchronizer, as well as its state 1461: */ 1462: public String toString() { 1463: int s = getState(); 1464: String q = hasQueuedThreads()? "non" : ""; 1465: return super.toString() + 1466: "[State = " + s + ", " + q + "empty queue]"; 1467: } 1468: 1469: 1470: // Internal support methods for Conditions 1471: 1472: /** 1473: * Returns true if a node, always one that was initially placed on 1474: * a condition queue, is now waiting to reacquire on sync queue. 1475: * @param node the node 1476: * @return true if is reacquiring 1477: */ 1478: final boolean isOnSyncQueue(Node node) { 1479: if (node.waitStatus == Node.CONDITION || node.prev == null) 1480: return false; 1481: if (node.next != null) // If has successor, it must be on queue 1482: return true; 1483: /* 1484: * node.prev can be non-null, but not yet on queue because 1485: * the CAS to place it on queue can fail. So we have to 1486: * traverse from tail to make sure it actually made it. It 1487: * will always be near the tail in calls to this method, and 1488: * unless the CAS failed (which is unlikely), it will be 1489: * there, so we hardly ever traverse much. 1490: */ 1491: return findNodeFromTail(node); 1492: } 1493: 1494: /** 1495: * Returns true if node is on sync queue by searching backwards from tail. 1496: * Called only when needed by isOnSyncQueue. 1497: * @return true if present 1498: */ 1499: private boolean findNodeFromTail(Node node) { 1500: Node t = tail; 1501: for (;;) { 1502: if (t == node) 1503: return true; 1504: if (t == null) 1505: return false; 1506: t = t.prev; 1507: } 1508: } 1509: 1510: /** 1511: * Transfers a node from a condition queue onto sync queue. 1512: * Returns true if successful. 1513: * @param node the node 1514: * @return true if successfully transferred (else the node was 1515: * cancelled before signal). 1516: */ 1517: final boolean transferForSignal(Node node) { 1518: /* 1519: * If cannot change waitStatus, the node has been cancelled. 1520: */ 1521: if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 1522: return false; 1523: 1524: /* 1525: * Splice onto queue and try to set waitStatus of predecessor to 1526: * indicate that thread is (probably) waiting. If cancelled or 1527: * attempt to set waitStatus fails, wake up to resync (in which 1528: * case the waitStatus can be transiently and harmlessly wrong). 1529: */ 1530: Node p = enq(node); 1531: int c = p.waitStatus; 1532: if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) 1533: LockSupport.unpark(node.thread); 1534: return true; 1535: } 1536: 1537: /** 1538: * Transfers node, if necessary, to sync queue after a cancelled 1539: * wait. Returns true if thread was cancelled before being 1540: * signalled. 1541: * @param current the waiting thread 1542: * @param node its node 1543: * @return true if cancelled before the node was signalled. 1544: */ 1545: final boolean transferAfterCancelledWait(Node node) { 1546: if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 1547: enq(node); 1548: return true; 1549: } 1550: /* 1551: * If we lost out to a signal(), then we can't proceed 1552: * until it finishes its enq(). Cancelling during an 1553: * incomplete transfer is both rare and transient, so just 1554: * spin. 1555: */ 1556: while (!isOnSyncQueue(node)) 1557: Thread.yield(); 1558: return false; 1559: } 1560: 1561: /** 1562: * Invokes release with current state value; returns saved state. 1563: * Cancels node and throws exception on failure. 1564: * @param node the condition node for this wait 1565: * @return previous sync state 1566: */ 1567: final int fullyRelease(Node node) { 1568: try { 1569: int savedState = getState(); 1570: if (release(savedState)) 1571: return savedState; 1572: } catch (RuntimeException ex) { 1573: node.waitStatus = Node.CANCELLED; 1574: throw ex; 1575: } 1576: // reach here if release fails 1577: node.waitStatus = Node.CANCELLED; 1578: throw new IllegalMonitorStateException(); 1579: } 1580: 1581: // Instrumentation methods for conditions 1582: 1583: /** 1584: * Queries whether the given ConditionObject 1585: * uses this synchronizer as its lock. 1586: * 1587: * @param condition the condition 1588: * @return <tt>true</tt> if owned 1589: * @throws NullPointerException if the condition is null 1590: */ 1591: public final boolean owns(ConditionObject condition) { 1592: if (condition == null) 1593: throw new NullPointerException(); 1594: return condition.isOwnedBy(this); 1595: } 1596: 1597: /** 1598: * Queries whether any threads are waiting on the given condition 1599: * associated with this synchronizer. Note that because timeouts 1600: * and interrupts may occur at any time, a <tt>true</tt> return 1601: * does not guarantee that a future <tt>signal</tt> will awaken 1602: * any threads. This method is designed primarily for use in 1603: * monitoring of the system state. 1604: * 1605: * @param condition the condition 1606: * @return <tt>true</tt> if there are any waiting threads 1607: * @throws IllegalMonitorStateException if exclusive synchronization 1608: * is not held 1609: * @throws IllegalArgumentException if the given condition is 1610: * not associated with this synchronizer 1611: * @throws NullPointerException if the condition is null 1612: */ 1613: public final boolean hasWaiters(ConditionObject condition) { 1614: if (!owns(condition)) 1615: throw new IllegalArgumentException("Not owner"); 1616: return condition.hasWaiters(); 1617: } 1618: 1619: /** 1620: * Returns an estimate of the number of threads waiting on the 1621: * given condition associated with this synchronizer. Note that 1622: * because timeouts and interrupts may occur at any time, the 1623: * estimate serves only as an upper bound on the actual number of 1624: * waiters. This method is designed for use in monitoring of the 1625: * system state, not for synchronization control. 1626: * 1627: * @param condition the condition 1628: * @return the estimated number of waiting threads 1629: * @throws IllegalMonitorStateException if exclusive synchronization 1630: * is not held 1631: * @throws IllegalArgumentException if the given condition is 1632: * not associated with this synchronizer 1633: * @throws NullPointerException if the condition is null 1634: */ 1635: public final int getWaitQueueLength(ConditionObject condition) { 1636: if (!owns(condition)) 1637: throw new IllegalArgumentException("Not owner"); 1638: return condition.getWaitQueueLength(); 1639: } 1640: 1641: /** 1642: * Returns a collection containing those threads that may be 1643: * waiting on the given condition associated with this 1644: * synchronizer. Because the actual set of threads may change 1645: * dynamically while constructing this result, the returned 1646: * collection is only a best-effort estimate. The elements of the 1647: * returned collection are in no particular order. 1648: * 1649: * @param condition the condition 1650: * @return the collection of threads 1651: * @throws IllegalMonitorStateException if exclusive synchronization 1652: * is not held 1653: * @throws IllegalArgumentException if the given condition is 1654: * not associated with this synchronizer 1655: * @throws NullPointerException if the condition is null 1656: */ 1657: public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1658: if (!owns(condition)) 1659: throw new IllegalArgumentException("Not owner"); 1660: return condition.getWaitingThreads(); 1661: } 1662: 1663: /** 1664: * Condition implementation for a {@link 1665: * AbstractQueuedSynchronizer} serving as the basis of a {@link 1666: * Lock} implementation. 1667: * 1668: * <p>Method documentation for this class describes mechanics, 1669: * not behavioral specifications from the point of view of Lock 1670: * and Condition users. Exported versions of this class will in 1671: * general need to be accompanied by documentation describing 1672: * condition semantics that rely on those of the associated 1673: * <tt>AbstractQueuedSynchronizer</tt>. 1674: * 1675: * <p>This class is Serializable, but all fields are transient, 1676: * so deserialized conditions have no waiters. 1677: */ 1678: public class ConditionObject implements Condition, java.io.Serializable { 1679: private static final long serialVersionUID = 1173984872572414699L; 1680: /** First node of condition queue. */ 1681: private transient Node firstWaiter; 1682: /** Last node of condition queue. */ 1683: private transient Node lastWaiter; 1684: 1685: /** 1686: * Creates a new <tt>ConditionObject</tt> instance. 1687: */ 1688: public ConditionObject() { } 1689: 1690: // Internal methods 1691: 1692: /** 1693: * Adds a new waiter to wait queue. 1694: * @return its new wait node 1695: */ 1696: private Node addConditionWaiter() { 1697: Node node = new Node(Thread.currentThread(), Node.CONDITION); 1698: Node t = lastWaiter; 1699: if (t == null) 1700: firstWaiter = node; 1701: else 1702: t.nextWaiter = node; 1703: lastWaiter = node; 1704: return node; 1705: } 1706: 1707: /** 1708: * Removes and transfers nodes until hit non-cancelled one or 1709: * null. Split out from signal in part to encourage compilers 1710: * to inline the case of no waiters. 1711: * @param first (non-null) the first node on condition queue 1712: */ 1713: private void doSignal(Node first) { 1714: do { 1715: if ( (firstWaiter = first.nextWaiter) == null) 1716: lastWaiter = null; 1717: first.nextWaiter = null; 1718: } while (!transferForSignal(first) && 1719: (first = firstWaiter) != null); 1720: } 1721: 1722: /** 1723: * Removes and transfers all nodes. 1724: * @param first (non-null) the first node on condition queue 1725: */ 1726: private void doSignalAll(Node first) { 1727: lastWaiter = firstWaiter = null; 1728: do { 1729: Node next = first.nextWaiter; 1730: first.nextWaiter = null; 1731: transferForSignal(first); 1732: first = next; 1733: } while (first != null); 1734: } 1735: 1736: /** 1737: * Returns true if given node is on this condition queue. 1738: * Call only when holding lock. 1739: */ 1740: private boolean isOnConditionQueue(Node node) { 1741: return node.next != null || node == lastWaiter; 1742: } 1743: 1744: /** 1745: * Unlinks a cancelled waiter node from condition queue. This 1746: * is called when cancellation occurred during condition wait, 1747: * not lock wait, and is called only after lock has been 1748: * re-acquired by a cancelled waiter and the node is not known 1749: * to already have been dequeued. It is needed to avoid 1750: * garbage retention in the absence of signals. So even though 1751: * it may require a full traversal, it comes into play only 1752: * when timeouts or cancellations occur in the absence of 1753: * signals. 1754: */ 1755: private void unlinkCancelledWaiter(Node node) { 1756: Node t = firstWaiter; 1757: Node trail = null; 1758: while (t != null) { 1759: if (t == node) { 1760: Node next = t.nextWaiter; 1761: if (trail == null) 1762: firstWaiter = next; 1763: else 1764: trail.nextWaiter = next; 1765: if (lastWaiter == node) 1766: lastWaiter = trail; 1767: break; 1768: } 1769: trail = t; 1770: t = t.nextWaiter; 1771: } 1772: } 1773: 1774: // public methods 1775: 1776: /** 1777: * Moves the longest-waiting thread, if one exists, from the 1778: * wait queue for this condition to the wait queue for the 1779: * owning lock. 1780: * 1781: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1782: * returns {@code false} 1783: */ 1784: public final void signal() { 1785: if (!isHeldExclusively()) 1786: throw new IllegalMonitorStateException(); 1787: Node first = firstWaiter; 1788: if (first != null) 1789: doSignal(first); 1790: } 1791: 1792: /** 1793: * Moves all threads from the wait queue for this condition to 1794: * the wait queue for the owning lock. 1795: * 1796: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1797: * returns {@code false} 1798: */ 1799: public final void signalAll() { 1800: if (!isHeldExclusively()) 1801: throw new IllegalMonitorStateException(); 1802: Node first = firstWaiter; 1803: if (first != null) 1804: doSignalAll(first); 1805: } 1806: 1807: /** 1808: * Implements uninterruptible condition wait. 1809: * <ol> 1810: * <li> Save lock state returned by {@link #getState} 1811: * <li> Invoke {@link #release} with 1812: * saved state as argument, throwing 1813: * IllegalMonitorStateException if it fails. 1814: * <li> Block until signalled 1815: * <li> Reacquire by invoking specialized version of 1816: * {@link #acquire} with saved state as argument. 1817: * </ol> 1818: */ 1819: public final void awaitUninterruptibly() { 1820: Node node = addConditionWaiter(); 1821: int savedState = fullyRelease(node); 1822: boolean interrupted = false; 1823: while (!isOnSyncQueue(node)) { 1824: LockSupport.park(this); 1825: if (Thread.interrupted()) 1826: interrupted = true; 1827: } 1828: if (acquireQueued(node, savedState) || interrupted) 1829: selfInterrupt(); 1830: } 1831: 1832: /* 1833: * For interruptible waits, we need to track whether to throw 1834: * InterruptedException, if interrupted while blocked on 1835: * condition, versus reinterrupt current thread, if 1836: * interrupted while blocked waiting to re-acquire. 1837: */ 1838: 1839: /** Mode meaning to reinterrupt on exit from wait */ 1840: private static final int REINTERRUPT = 1; 1841: /** Mode meaning to throw InterruptedException on exit from wait */ 1842: private static final int THROW_IE = -1; 1843: 1844: /** 1845: * Checks for interrupt, returning THROW_IE if interrupted 1846: * before signalled, REINTERRUPT if after signalled, or 1847: * 0 if not interrupted. 1848: */ 1849: private int checkInterruptWhileWaiting(Node node) { 1850: return (Thread.interrupted()) ? 1851: ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) : 1852: 0; 1853: } 1854: 1855: /** 1856: * Throws InterruptedException, reinterrupts current thread, or 1857: * does nothing, depending on mode. 1858: */ 1859: private void reportInterruptAfterWait(int interruptMode) 1860: throws InterruptedException { 1861: if (interruptMode == THROW_IE) 1862: throw new InterruptedException(); 1863: else if (interruptMode == REINTERRUPT) 1864: selfInterrupt(); 1865: } 1866: 1867: /** 1868: * Implements interruptible condition wait. 1869: * <ol> 1870: * <li> If current thread is interrupted, throw InterruptedException 1871: * <li> Save lock state returned by {@link #getState} 1872: * <li> Invoke {@link #release} with 1873: * saved state as argument, throwing 1874: * IllegalMonitorStateException if it fails. 1875: * <li> Block until signalled or interrupted 1876: * <li> Reacquire by invoking specialized version of 1877: * {@link #acquire} with saved state as argument. 1878: * <li> If interrupted while blocked in step 4, throw exception 1879: * </ol> 1880: */ 1881: public final void await() throws InterruptedException { 1882: if (Thread.interrupted()) 1883: throw new InterruptedException(); 1884: Node node = addConditionWaiter(); 1885: int savedState = fullyRelease(node); 1886: int interruptMode = 0; 1887: while (!isOnSyncQueue(node)) { 1888: LockSupport.park(this); 1889: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1890: break; 1891: } 1892: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1893: interruptMode = REINTERRUPT; 1894: if (isOnConditionQueue(node)) 1895: unlinkCancelledWaiter(node); 1896: if (interruptMode != 0) 1897: reportInterruptAfterWait(interruptMode); 1898: } 1899: 1900: /** 1901: * Implements timed condition wait. 1902: * <ol> 1903: * <li> If current thread is interrupted, throw InterruptedException 1904: * <li> Save lock state returned by {@link #getState} 1905: * <li> Invoke {@link #release} with 1906: * saved state as argument, throwing 1907: * IllegalMonitorStateException if it fails. 1908: * <li> Block until signalled, interrupted, or timed out 1909: * <li> Reacquire by invoking specialized version of 1910: * {@link #acquire} with saved state as argument. 1911: * <li> If interrupted while blocked in step 4, throw InterruptedException 1912: * </ol> 1913: */ 1914: public final long awaitNanos(long nanosTimeout) throws InterruptedException { 1915: if (Thread.interrupted()) 1916: throw new InterruptedException(); 1917: Node node = addConditionWaiter(); 1918: int savedState = fullyRelease(node); 1919: long lastTime = System.nanoTime(); 1920: int interruptMode = 0; 1921: while (!isOnSyncQueue(node)) { 1922: if (nanosTimeout <= 0L) { 1923: transferAfterCancelledWait(node); 1924: break; 1925: } 1926: LockSupport.parkNanos(this, nanosTimeout); 1927: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1928: break; 1929: 1930: long now = System.nanoTime(); 1931: nanosTimeout -= now - lastTime; 1932: lastTime = now; 1933: } 1934: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1935: interruptMode = REINTERRUPT; 1936: if (isOnConditionQueue(node)) 1937: unlinkCancelledWaiter(node); 1938: if (interruptMode != 0) 1939: reportInterruptAfterWait(interruptMode); 1940: return nanosTimeout - (System.nanoTime() - lastTime); 1941: } 1942: 1943: /** 1944: * Implements absolute timed condition wait. 1945: * <ol> 1946: * <li> If current thread is interrupted, throw InterruptedException 1947: * <li> Save lock state returned by {@link #getState} 1948: * <li> Invoke {@link #release} with 1949: * saved state as argument, throwing 1950: * IllegalMonitorStateException if it fails. 1951: * <li> Block until signalled, interrupted, or timed out 1952: * <li> Reacquire by invoking specialized version of 1953: * {@link #acquire} with saved state as argument. 1954: * <li> If interrupted while blocked in step 4, throw InterruptedException 1955: * <li> If timed out while blocked in step 4, return false, else true 1956: * </ol> 1957: */ 1958: public final boolean awaitUntil(Date deadline) throws InterruptedException { 1959: if (deadline == null) 1960: throw new NullPointerException(); 1961: long abstime = deadline.getTime(); 1962: if (Thread.interrupted()) 1963: throw new InterruptedException(); 1964: Node node = addConditionWaiter(); 1965: int savedState = fullyRelease(node); 1966: boolean timedout = false; 1967: int interruptMode = 0; 1968: while (!isOnSyncQueue(node)) { 1969: if (System.currentTimeMillis() > abstime) { 1970: timedout = transferAfterCancelledWait(node); 1971: break; 1972: } 1973: LockSupport.parkUntil(this, abstime); 1974: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1975: break; 1976: } 1977: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1978: interruptMode = REINTERRUPT; 1979: if (isOnConditionQueue(node)) 1980: unlinkCancelledWaiter(node); 1981: if (interruptMode != 0) 1982: reportInterruptAfterWait(interruptMode); 1983: return !timedout; 1984: } 1985: 1986: /** 1987: * Implements timed condition wait. 1988: * <ol> 1989: * <li> If current thread is interrupted, throw InterruptedException 1990: * <li> Save lock state returned by {@link #getState} 1991: * <li> Invoke {@link #release} with 1992: * saved state as argument, throwing 1993: * IllegalMonitorStateException if it fails. 1994: * <li> Block until signalled, interrupted, or timed out 1995: * <li> Reacquire by invoking specialized version of 1996: * {@link #acquire} with saved state as argument. 1997: * <li> If interrupted while blocked in step 4, throw InterruptedException 1998: * <li> If timed out while blocked in step 4, return false, else true 1999: * </ol> 2000: */ 2001: public final boolean await(long time, TimeUnit unit) throws InterruptedException { 2002: if (unit == null) 2003: throw new NullPointerException(); 2004: long nanosTimeout = unit.toNanos(time); 2005: if (Thread.interrupted()) 2006: throw new InterruptedException(); 2007: Node node = addConditionWaiter(); 2008: int savedState = fullyRelease(node); 2009: long lastTime = System.nanoTime(); 2010: boolean timedout = false; 2011: int interruptMode = 0; 2012: while (!isOnSyncQueue(node)) { 2013: if (nanosTimeout <= 0L) { 2014: timedout = transferAfterCancelledWait(node); 2015: break; 2016: } 2017: LockSupport.parkNanos(this, nanosTimeout); 2018: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2019: break; 2020: long now = System.nanoTime(); 2021: nanosTimeout -= now - lastTime; 2022: lastTime = now; 2023: } 2024: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2025: interruptMode = REINTERRUPT; 2026: if (isOnConditionQueue(node)) 2027: unlinkCancelledWaiter(node); 2028: if (interruptMode != 0) 2029: reportInterruptAfterWait(interruptMode); 2030: return !timedout; 2031: } 2032: 2033: // support for instrumentation 2034: 2035: /** 2036: * Returns true if this condition was created by the given 2037: * synchronization object. 2038: * 2039: * @return {@code true} if owned 2040: */ 2041: final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 2042: return sync == AbstractQueuedSynchronizer.this; 2043: } 2044: 2045: /** 2046: * Queries whether any threads are waiting on this condition. 2047: * Implements {@link AbstractQueuedSynchronizer#hasWaiters}. 2048: * 2049: * @return {@code true} if there are any waiting threads 2050: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2051: * returns {@code false} 2052: */ 2053: protected final boolean hasWaiters() { 2054: if (!isHeldExclusively()) 2055: throw new IllegalMonitorStateException(); 2056: for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2057: if (w.waitStatus == Node.CONDITION) 2058: return true; 2059: } 2060: return false; 2061: } 2062: 2063: /** 2064: * Returns an estimate of the number of threads waiting on 2065: * this condition. 2066: * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength}. 2067: * 2068: * @return the estimated number of waiting threads 2069: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2070: * returns {@code false} 2071: */ 2072: protected final int getWaitQueueLength() { 2073: if (!isHeldExclusively()) 2074: throw new IllegalMonitorStateException(); 2075: int n = 0; 2076: for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2077: if (w.waitStatus == Node.CONDITION) 2078: ++n; 2079: } 2080: return n; 2081: } 2082: 2083: /** 2084: * Returns a collection containing those threads that may be 2085: * waiting on this Condition. 2086: * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads}. 2087: * 2088: * @return the collection of threads 2089: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2090: * returns {@code false} 2091: */ 2092: protected final Collection<Thread> getWaitingThreads() { 2093: if (!isHeldExclusively()) 2094: throw new IllegalMonitorStateException(); 2095: ArrayList<Thread> list = new ArrayList<Thread>(); 2096: for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2097: if (w.waitStatus == Node.CONDITION) { 2098: Thread t = w.thread; 2099: if (t != null) 2100: list.add(t); 2101: } 2102: } 2103: return list; 2104: } 2105: } 2106: 2107: /** 2108: * Setup to support compareAndSet. We need to natively implement 2109: * this here: For the sake of permitting future enhancements, we 2110: * cannot explicitly subclass AtomicInteger, which would be 2111: * efficient and useful otherwise. So, as the lesser of evils, we 2112: * natively implement using hotspot intrinsics API. And while we 2113: * are at it, we do the same for other CASable fields (which could 2114: * otherwise be done with atomic field updaters). 2115: */ 2116: private static final Unsafe unsafe = Unsafe.getUnsafe(); 2117: private static final long stateOffset; 2118: private static final long headOffset; 2119: private static final long tailOffset; 2120: private static final long waitStatusOffset; 2121: 2122: static { 2123: try { 2124: stateOffset = unsafe.objectFieldOffset 2125: (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 2126: headOffset = unsafe.objectFieldOffset 2127: (AbstractQueuedSynchronizer.class.getDeclaredField("head")); 2128: tailOffset = unsafe.objectFieldOffset 2129: (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); 2130: waitStatusOffset = unsafe.objectFieldOffset 2131: (Node.class.getDeclaredField("waitStatus")); 2132: 2133: } catch (Exception ex) { throw new Error(ex); } 2134: } 2135: 2136: /** 2137: * CAS head field. Used only by enq 2138: */ 2139: private final boolean compareAndSetHead(Node update) { 2140: return unsafe.compareAndSwapObject(this, headOffset, null, update); 2141: } 2142: 2143: /** 2144: * CAS tail field. Used only by enq 2145: */ 2146: private final boolean compareAndSetTail(Node expect, Node update) { 2147: return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 2148: } 2149: 2150: /** 2151: * CAS waitStatus field of a node. 2152: */ 2153: private final static boolean compareAndSetWaitStatus(Node node, 2154: int expect, 2155: int update) { 2156: return unsafe.compareAndSwapInt(node, waitStatusOffset, 2157: expect, update); 2158: } 2159: }
GNU Classpath (0.98) |