GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent.locks; 8: import java.util.*; 9: import java.util.concurrent.*; 10: import java.util.concurrent.atomic.*; 11: import sun.misc.Unsafe; 12: 13: /** 14: * A version of {@link AbstractQueuedSynchronizer} in 15: * which synchronization state is maintained as a <tt>long</tt>. 16: * This class has exactly the same structure, properties, and methods 17: * as <tt>AbstractQueuedSynchronizer</tt> with the exception 18: * that all state-related parameters and results are defined 19: * as <tt>long</tt> rather than <tt>int</tt>. This class 20: * may be useful when creating synchronizers such as 21: * multilevel locks and barriers that require 22: * 64 bits of state. 23: * 24: * <p>See {@link AbstractQueuedSynchronizer} for usage 25: * notes and examples. 26: * 27: * @since 1.6 28: * @author Doug Lea 29: */ 30: public abstract class AbstractQueuedLongSynchronizer 31: extends AbstractOwnableSynchronizer 32: implements java.io.Serializable { 33: 34: private static final long serialVersionUID = 7373984972572414692L; 35: 36: /* 37: To keep sources in sync, the remainder of this source file is 38: exactly cloned from AbstractQueuedSynchronizer, replacing class 39: name and changing ints related with sync state to longs. Please 40: keep it that way. 41: */ 42: 43: /** 44: * Creates a new <tt>AbstractQueuedLongSynchronizer</tt> instance 45: * with initial synchronization state of zero. 46: */ 47: protected AbstractQueuedLongSynchronizer() { } 48: 49: /** 50: * Wait queue node class. 51: * 52: * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 53: * Hagersten) lock queue. CLH locks are normally used for 54: * spinlocks. We instead use them for blocking synchronizers, but 55: * use the same basic tactic of holding some of the control 56: * information about a thread in the predecessor of its node. A 57: * "status" field in each node keeps track of whether a thread 58: * should block. A node is signalled when its predecessor 59: * releases. Each node of the queue otherwise serves as a 60: * specific-notification-style monitor holding a single waiting 61: * thread. The status field does NOT control whether threads are 62: * granted locks etc though. A thread may try to acquire if it is 63: * first in the queue. But being first does not guarantee success; 64: * it only gives the right to contend. So the currently released 65: * contender thread may need to rewait. 66: * 67: * <p>To enqueue into a CLH lock, you atomically splice it in as new 68: * tail. To dequeue, you just set the head field. 69: * <pre> 70: * +------+ prev +-----+ +-----+ 71: * head | | <---- | | <---- | | tail 72: * +------+ +-----+ +-----+ 73: * </pre> 74: * 75: * <p>Insertion into a CLH queue requires only a single atomic 76: * operation on "tail", so there is a simple atomic point of 77: * demarcation from unqueued to queued. Similarly, dequeing 78: * involves only updating the "head". However, it takes a bit 79: * more work for nodes to determine who their successors are, 80: * in part to deal with possible cancellation due to timeouts 81: * and interrupts. 82: * 83: * <p>The "prev" links (not used in original CLH locks), are mainly 84: * needed to handle cancellation. If a node is cancelled, its 85: * successor is (normally) relinked to a non-cancelled 86: * predecessor. For explanation of similar mechanics in the case 87: * of spin locks, see the papers by Scott and Scherer at 88: * http://www.cs.rochester.edu/u/scott/synchronization/ 89: * 90: * <p>We also use "next" links to implement blocking mechanics. 91: * The thread id for each node is kept in its own node, so a 92: * predecessor signals the next node to wake up by traversing 93: * next link to determine which thread it is. Determination of 94: * successor must avoid races with newly queued nodes to set 95: * the "next" fields of their predecessors. This is solved 96: * when necessary by checking backwards from the atomically 97: * updated "tail" when a node's successor appears to be null. 98: * (Or, said differently, the next-links are an optimization 99: * so that we don't usually need a backward scan.) 100: * 101: * <p>Cancellation introduces some conservatism to the basic 102: * algorithms. Since we must poll for cancellation of other 103: * nodes, we can miss noticing whether a cancelled node is 104: * ahead or behind us. This is dealt with by always unparking 105: * successors upon cancellation, allowing them to stabilize on 106: * a new predecessor. 107: * 108: * <p>CLH queues need a dummy header node to get started. But 109: * we don't create them on construction, because it would be wasted 110: * effort if there is never contention. Instead, the node 111: * is constructed and head and tail pointers are set upon first 112: * contention. 113: * 114: * <p>Threads waiting on Conditions use the same nodes, but 115: * use an additional link. Conditions only need to link nodes 116: * in simple (non-concurrent) linked queues because they are 117: * only accessed when exclusively held. Upon await, a node is 118: * inserted into a condition queue. Upon signal, the node is 119: * transferred to the main queue. A special value of status 120: * field is used to mark which queue a node is on. 121: * 122: * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 123: * Scherer and Michael Scott, along with members of JSR-166 124: * expert group, for helpful ideas, discussions, and critiques 125: * on the design of this class. 126: */ 127: static final class Node { 128: /** waitStatus value to indicate thread has cancelled */ 129: static final int CANCELLED = 1; 130: /** waitStatus value to indicate successor's thread needs unparking */ 131: static final int SIGNAL = -1; 132: /** waitStatus value to indicate thread is waiting on condition */ 133: static final int CONDITION = -2; 134: /** Marker to indicate a node is waiting in shared mode */ 135: static final Node SHARED = new Node(); 136: /** Marker to indicate a node is waiting in exclusive mode */ 137: static final Node EXCLUSIVE = null; 138: 139: /** 140: * Status field, taking on only the values: 141: * SIGNAL: The successor of this node is (or will soon be) 142: * blocked (via park), so the current node must 143: * unpark its successor when it releases or 144: * cancels. To avoid races, acquire methods must 145: * first indicate they need a signal, 146: * then retry the atomic acquire, and then, 147: * on failure, block. 148: * CANCELLED: This node is cancelled due to timeout or interrupt. 149: * Nodes never leave this state. In particular, 150: * a thread with cancelled node never again blocks. 151: * CONDITION: This node is currently on a condition queue. 152: * It will not be used as a sync queue node until 153: * transferred. (Use of this value here 154: * has nothing to do with the other uses 155: * of the field, but simplifies mechanics.) 156: * 0: None of the above 157: * 158: * The values are arranged numerically to simplify use. 159: * Non-negative values mean that a node doesn't need to 160: * signal. So, most code doesn't need to check for particular 161: * values, just for sign. 162: * 163: * The field is initialized to 0 for normal sync nodes, and 164: * CONDITION for condition nodes. It is modified only using 165: * CAS. 166: */ 167: volatile int waitStatus; 168: 169: /** 170: * Link to predecessor node that current node/thread relies on 171: * for checking waitStatus. Assigned during enqueing, and nulled 172: * out (for sake of GC) only upon dequeuing. Also, upon 173: * cancellation of a predecessor, we short-circuit while 174: * finding a non-cancelled one, which will always exist 175: * because the head node is never cancelled: A node becomes 176: * head only as a result of successful acquire. A 177: * cancelled thread never succeeds in acquiring, and a thread only 178: * cancels itself, not any other node. 179: */ 180: volatile Node prev; 181: 182: /** 183: * Link to the successor node that the current node/thread 184: * unparks upon release. Assigned once during enqueuing, and 185: * nulled out (for sake of GC) when no longer needed. Upon 186: * cancellation, we cannot adjust this field, but can notice 187: * status and bypass the node if cancelled. The enq operation 188: * does not assign next field of a predecessor until after 189: * attachment, so seeing a null next field does not 190: * necessarily mean that node is at end of queue. However, if 191: * a next field appears to be null, we can scan prev's from 192: * the tail to double-check. 193: */ 194: volatile Node next; 195: 196: /** 197: * The thread that enqueued this node. Initialized on 198: * construction and nulled out after use. 199: */ 200: volatile Thread thread; 201: 202: /** 203: * Link to next node waiting on condition, or the special 204: * value SHARED. Because condition queues are accessed only 205: * when holding in exclusive mode, we just need a simple 206: * linked queue to hold nodes while they are waiting on 207: * conditions. They are then transferred to the queue to 208: * re-acquire. And because conditions can only be exclusive, 209: * we save a field by using special value to indicate shared 210: * mode. 211: */ 212: Node nextWaiter; 213: 214: /** 215: * Returns true if node is waiting in shared mode 216: */ 217: final boolean isShared() { 218: return nextWaiter == SHARED; 219: } 220: 221: /** 222: * Returns previous node, or throws NullPointerException if 223: * null. Use when predecessor cannot be null. 224: * @return the predecessor of this node 225: */ 226: final Node predecessor() throws NullPointerException { 227: Node p = prev; 228: if (p == null) 229: throw new NullPointerException(); 230: else 231: return p; 232: } 233: 234: Node() { // Used to establish initial head or SHARED marker 235: } 236: 237: Node(Thread thread, Node mode) { // Used by addWaiter 238: this.nextWaiter = mode; 239: this.thread = thread; 240: } 241: 242: Node(Thread thread, int waitStatus) { // Used by Condition 243: this.waitStatus = waitStatus; 244: this.thread = thread; 245: } 246: } 247: 248: /** 249: * Head of the wait queue, lazily initialized. Except for 250: * initialization, it is modified only via method setHead. Note: 251: * If head exists, its waitStatus is guaranteed not to be 252: * CANCELLED. 253: */ 254: private transient volatile Node head; 255: 256: /** 257: * Tail of the wait queue, lazily initialized. Modified only via 258: * method enq to add new wait node. 259: */ 260: private transient volatile Node tail; 261: 262: /** 263: * The synchronization state. 264: */ 265: private volatile long state; 266: 267: /** 268: * Returns the current value of synchronization state. 269: * This operation has memory semantics of a <tt>volatile</tt> read. 270: * @return current state value 271: */ 272: protected final long getState() { 273: return state; 274: } 275: 276: /** 277: * Sets the value of synchronization state. 278: * This operation has memory semantics of a <tt>volatile</tt> write. 279: * @param newState the new state value 280: */ 281: protected final void setState(long newState) { 282: state = newState; 283: } 284: 285: /** 286: * Atomically sets synchronization state to the given updated 287: * value if the current state value equals the expected value. 288: * This operation has memory semantics of a <tt>volatile</tt> read 289: * and write. 290: * 291: * @param expect the expected value 292: * @param update the new value 293: * @return true if successful. False return indicates that the actual 294: * value was not equal to the expected value. 295: */ 296: protected final boolean compareAndSetState(long expect, long update) { 297: // See below for intrinsics setup to support this 298: return unsafe.compareAndSwapLong(this, stateOffset, expect, update); 299: } 300: 301: // Queuing utilities 302: 303: /** 304: * The number of nanoseconds for which it is faster to spin 305: * rather than to use timed park. A rough estimate suffices 306: * to improve responsiveness with very short timeouts. 307: */ 308: static final long spinForTimeoutThreshold = 1000L; 309: 310: /** 311: * Inserts node into queue, initializing if necessary. See picture above. 312: * @param node the node to insert 313: * @return node's predecessor 314: */ 315: private Node enq(final Node node) { 316: for (;;) { 317: Node t = tail; 318: if (t == null) { // Must initialize 319: Node h = new Node(); // Dummy header 320: h.next = node; 321: node.prev = h; 322: if (compareAndSetHead(h)) { 323: tail = node; 324: return h; 325: } 326: } 327: else { 328: node.prev = t; 329: if (compareAndSetTail(t, node)) { 330: t.next = node; 331: return t; 332: } 333: } 334: } 335: } 336: 337: /** 338: * Creates and enqueues node for given thread and mode. 339: * 340: * @param current the thread 341: * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 342: * @return the new node 343: */ 344: private Node addWaiter(Node mode) { 345: Node node = new Node(Thread.currentThread(), mode); 346: // Try the fast path of enq; backup to full enq on failure 347: Node pred = tail; 348: if (pred != null) { 349: node.prev = pred; 350: if (compareAndSetTail(pred, node)) { 351: pred.next = node; 352: return node; 353: } 354: } 355: enq(node); 356: return node; 357: } 358: 359: /** 360: * Sets head of queue to be node, thus dequeuing. Called only by 361: * acquire methods. Also nulls out unused fields for sake of GC 362: * and to suppress unnecessary signals and traversals. 363: * 364: * @param node the node 365: */ 366: private void setHead(Node node) { 367: head = node; 368: node.thread = null; 369: node.prev = null; 370: } 371: 372: /** 373: * Wakes up node's successor, if one exists. 374: * 375: * @param node the node 376: */ 377: private void unparkSuccessor(Node node) { 378: /* 379: * Try to clear status in anticipation of signalling. It is 380: * OK if this fails or if status is changed by waiting thread. 381: */ 382: compareAndSetWaitStatus(node, Node.SIGNAL, 0); 383: 384: /* 385: * Thread to unpark is held in successor, which is normally 386: * just the next node. But if cancelled or apparently null, 387: * traverse backwards from tail to find the actual 388: * non-cancelled successor. 389: */ 390: Node s = node.next; 391: if (s == null || s.waitStatus > 0) { 392: s = null; 393: for (Node t = tail; t != null && t != node; t = t.prev) 394: if (t.waitStatus <= 0) 395: s = t; 396: } 397: if (s != null) 398: LockSupport.unpark(s.thread); 399: } 400: 401: /** 402: * Sets head of queue, and checks if successor may be waiting 403: * in shared mode, if so propagating if propagate > 0. 404: * 405: * @param pred the node holding waitStatus for node 406: * @param node the node 407: * @param propagate the return value from a tryAcquireShared 408: */ 409: private void setHeadAndPropagate(Node node, long propagate) { 410: setHead(node); 411: if (propagate > 0 && node.waitStatus != 0) { 412: /* 413: * Don't bother fully figuring out successor. If it 414: * looks null, call unparkSuccessor anyway to be safe. 415: */ 416: Node s = node.next; 417: if (s == null || s.isShared()) 418: unparkSuccessor(node); 419: } 420: } 421: 422: // Utilities for various versions of acquire 423: 424: /** 425: * Cancels an ongoing attempt to acquire. 426: * 427: * @param node the node 428: */ 429: private void cancelAcquire(Node node) { 430: if (node != null) { // Ignore if node doesn't exist 431: node.thread = null; 432: // Can use unconditional write instead of CAS here 433: node.waitStatus = Node.CANCELLED; 434: unparkSuccessor(node); 435: } 436: } 437: 438: /** 439: * Checks and updates status for a node that failed to acquire. 440: * Returns true if thread should block. This is the main signal 441: * control in all acquire loops. Requires that pred == node.prev 442: * 443: * @param pred node's predecessor holding status 444: * @param node the node 445: * @return {@code true} if thread should block 446: */ 447: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 448: int s = pred.waitStatus; 449: if (s < 0) 450: /* 451: * This node has already set status asking a release 452: * to signal it, so it can safely park 453: */ 454: return true; 455: if (s > 0) 456: /* 457: * Predecessor was cancelled. Move up to its predecessor 458: * and indicate retry. 459: */ 460: node.prev = pred.prev; 461: else 462: /* 463: * Indicate that we need a signal, but don't park yet. Caller 464: * will need to retry to make sure it cannot acquire before 465: * parking. 466: */ 467: compareAndSetWaitStatus(pred, 0, Node.SIGNAL); 468: return false; 469: } 470: 471: /** 472: * Convenience method to interrupt current thread. 473: */ 474: private static void selfInterrupt() { 475: Thread.currentThread().interrupt(); 476: } 477: 478: /** 479: * Convenience method to park and then check if interrupted 480: * 481: * @return {@code true} if interrupted 482: */ 483: private final boolean parkAndCheckInterrupt() { 484: LockSupport.park(this); 485: return Thread.interrupted(); 486: } 487: 488: /* 489: * Various flavors of acquire, varying in exclusive/shared and 490: * control modes. Each is mostly the same, but annoyingly 491: * different. Only a little bit of factoring is possible due to 492: * interactions of exception mechanics (including ensuring that we 493: * cancel if tryAcquire throws exception) and other control, at 494: * least not without hurting performance too much. 495: */ 496: 497: /** 498: * Acquires in exclusive uninterruptible mode for thread already in 499: * queue. Used by condition wait methods as well as acquire. 500: * 501: * @param node the node 502: * @param arg the acquire argument 503: * @return {@code true} if interrupted while waiting 504: */ 505: final boolean acquireQueued(final Node node, long arg) { 506: try { 507: boolean interrupted = false; 508: for (;;) { 509: final Node p = node.predecessor(); 510: if (p == head && tryAcquire(arg)) { 511: setHead(node); 512: p.next = null; // help GC 513: return interrupted; 514: } 515: if (shouldParkAfterFailedAcquire(p, node) && 516: parkAndCheckInterrupt()) 517: interrupted = true; 518: } 519: } catch (RuntimeException ex) { 520: cancelAcquire(node); 521: throw ex; 522: } 523: } 524: 525: /** 526: * Acquires in exclusive interruptible mode. 527: * @param arg the acquire argument 528: */ 529: private void doAcquireInterruptibly(long arg) 530: throws InterruptedException { 531: final Node node = addWaiter(Node.EXCLUSIVE); 532: try { 533: for (;;) { 534: final Node p = node.predecessor(); 535: if (p == head && tryAcquire(arg)) { 536: setHead(node); 537: p.next = null; // help GC 538: return; 539: } 540: if (shouldParkAfterFailedAcquire(p, node) && 541: parkAndCheckInterrupt()) 542: break; 543: } 544: } catch (RuntimeException ex) { 545: cancelAcquire(node); 546: throw ex; 547: } 548: // Arrive here only if interrupted 549: cancelAcquire(node); 550: throw new InterruptedException(); 551: } 552: 553: /** 554: * Acquires in exclusive timed mode. 555: * 556: * @param arg the acquire argument 557: * @param nanosTimeout max wait time 558: * @return {@code true} if acquired 559: */ 560: private boolean doAcquireNanos(long arg, long nanosTimeout) 561: throws InterruptedException { 562: long lastTime = System.nanoTime(); 563: final Node node = addWaiter(Node.EXCLUSIVE); 564: try { 565: for (;;) { 566: final Node p = node.predecessor(); 567: if (p == head && tryAcquire(arg)) { 568: setHead(node); 569: p.next = null; // help GC 570: return true; 571: } 572: if (nanosTimeout <= 0) { 573: cancelAcquire(node); 574: return false; 575: } 576: if (nanosTimeout > spinForTimeoutThreshold && 577: shouldParkAfterFailedAcquire(p, node)) 578: LockSupport.parkNanos(this, nanosTimeout); 579: long now = System.nanoTime(); 580: nanosTimeout -= now - lastTime; 581: lastTime = now; 582: if (Thread.interrupted()) 583: break; 584: } 585: } catch (RuntimeException ex) { 586: cancelAcquire(node); 587: throw ex; 588: } 589: // Arrive here only if interrupted 590: cancelAcquire(node); 591: throw new InterruptedException(); 592: } 593: 594: /** 595: * Acquires in shared uninterruptible mode. 596: * @param arg the acquire argument 597: */ 598: private void doAcquireShared(long arg) { 599: final Node node = addWaiter(Node.SHARED); 600: try { 601: boolean interrupted = false; 602: for (;;) { 603: final Node p = node.predecessor(); 604: if (p == head) { 605: long r = tryAcquireShared(arg); 606: if (r >= 0) { 607: setHeadAndPropagate(node, r); 608: p.next = null; // help GC 609: if (interrupted) 610: selfInterrupt(); 611: return; 612: } 613: } 614: if (shouldParkAfterFailedAcquire(p, node) && 615: parkAndCheckInterrupt()) 616: interrupted = true; 617: } 618: } catch (RuntimeException ex) { 619: cancelAcquire(node); 620: throw ex; 621: } 622: } 623: 624: /** 625: * Acquires in shared interruptible mode. 626: * @param arg the acquire argument 627: */ 628: private void doAcquireSharedInterruptibly(long arg) 629: throws InterruptedException { 630: final Node node = addWaiter(Node.SHARED); 631: try { 632: for (;;) { 633: final Node p = node.predecessor(); 634: if (p == head) { 635: long r = tryAcquireShared(arg); 636: if (r >= 0) { 637: setHeadAndPropagate(node, r); 638: p.next = null; // help GC 639: return; 640: } 641: } 642: if (shouldParkAfterFailedAcquire(p, node) && 643: parkAndCheckInterrupt()) 644: break; 645: } 646: } catch (RuntimeException ex) { 647: cancelAcquire(node); 648: throw ex; 649: } 650: // Arrive here only if interrupted 651: cancelAcquire(node); 652: throw new InterruptedException(); 653: } 654: 655: /** 656: * Acquires in shared timed mode. 657: * 658: * @param arg the acquire argument 659: * @param nanosTimeout max wait time 660: * @return {@code true} if acquired 661: */ 662: private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 663: throws InterruptedException { 664: 665: long lastTime = System.nanoTime(); 666: final Node node = addWaiter(Node.SHARED); 667: try { 668: for (;;) { 669: final Node p = node.predecessor(); 670: if (p == head) { 671: long r = tryAcquireShared(arg); 672: if (r >= 0) { 673: setHeadAndPropagate(node, r); 674: p.next = null; // help GC 675: return true; 676: } 677: } 678: if (nanosTimeout <= 0) { 679: cancelAcquire(node); 680: return false; 681: } 682: if (nanosTimeout > spinForTimeoutThreshold && 683: shouldParkAfterFailedAcquire(p, node)) 684: LockSupport.parkNanos(this, nanosTimeout); 685: long now = System.nanoTime(); 686: nanosTimeout -= now - lastTime; 687: lastTime = now; 688: if (Thread.interrupted()) 689: break; 690: } 691: } catch (RuntimeException ex) { 692: cancelAcquire(node); 693: throw ex; 694: } 695: // Arrive here only if interrupted 696: cancelAcquire(node); 697: throw new InterruptedException(); 698: } 699: 700: // Main exported methods 701: 702: /** 703: * Attempts to acquire in exclusive mode. This method should query 704: * if the state of the object permits it to be acquired in the 705: * exclusive mode, and if so to acquire it. 706: * 707: * <p>This method is always invoked by the thread performing 708: * acquire. If this method reports failure, the acquire method 709: * may queue the thread, if it is not already queued, until it is 710: * signalled by a release from some other thread. This can be used 711: * to implement method {@link Lock#tryLock()}. 712: * 713: * <p>The default 714: * implementation throws {@link UnsupportedOperationException}. 715: * 716: * @param arg the acquire argument. This value is always the one 717: * passed to an acquire method, or is the value saved on entry 718: * to a condition wait. The value is otherwise uninterpreted 719: * and can represent anything you like. 720: * @return {@code true} if successful. Upon success, this object has 721: * been acquired. 722: * @throws IllegalMonitorStateException if acquiring would place this 723: * synchronizer in an illegal state. This exception must be 724: * thrown in a consistent fashion for synchronization to work 725: * correctly. 726: * @throws UnsupportedOperationException if exclusive mode is not supported 727: */ 728: protected boolean tryAcquire(long arg) { 729: throw new UnsupportedOperationException(); 730: } 731: 732: /** 733: * Attempts to set the state to reflect a release in exclusive 734: * mode. 735: * 736: * <p>This method is always invoked by the thread performing release. 737: * 738: * <p>The default implementation throws 739: * {@link UnsupportedOperationException}. 740: * 741: * @param arg the release argument. This value is always the one 742: * passed to a release method, or the current state value upon 743: * entry to a condition wait. The value is otherwise 744: * uninterpreted and can represent anything you like. 745: * @return {@code true} if this object is now in a fully released 746: * state, so that any waiting threads may attempt to acquire; 747: * and {@code false} otherwise. 748: * @throws IllegalMonitorStateException if releasing would place this 749: * synchronizer in an illegal state. This exception must be 750: * thrown in a consistent fashion for synchronization to work 751: * correctly. 752: * @throws UnsupportedOperationException if exclusive mode is not supported 753: */ 754: protected boolean tryRelease(long arg) { 755: throw new UnsupportedOperationException(); 756: } 757: 758: /** 759: * Attempts to acquire in shared mode. This method should query if 760: * the state of the object permits it to be acquired in the shared 761: * mode, and if so to acquire it. 762: * 763: * <p>This method is always invoked by the thread performing 764: * acquire. If this method reports failure, the acquire method 765: * may queue the thread, if it is not already queued, until it is 766: * signalled by a release from some other thread. 767: * 768: * <p>The default implementation throws {@link 769: * UnsupportedOperationException}. 770: * 771: * @param arg the acquire argument. This value is always the one 772: * passed to an acquire method, or is the value saved on entry 773: * to a condition wait. The value is otherwise uninterpreted 774: * and can represent anything you like. 775: * @return a negative value on failure; zero if acquisition in shared 776: * mode succeeded but no subsequent shared-mode acquire can 777: * succeed; and a positive value if acquisition in shared 778: * mode succeeded and subsequent shared-mode acquires might 779: * also succeed, in which case a subsequent waiting thread 780: * must check availability. (Support for three different 781: * return values enables this method to be used in contexts 782: * where acquires only sometimes act exclusively.) Upon 783: * success, this object has been acquired. 784: * @throws IllegalMonitorStateException if acquiring would place this 785: * synchronizer in an illegal state. This exception must be 786: * thrown in a consistent fashion for synchronization to work 787: * correctly. 788: * @throws UnsupportedOperationException if shared mode is not supported 789: */ 790: protected long tryAcquireShared(long arg) { 791: throw new UnsupportedOperationException(); 792: } 793: 794: /** 795: * Attempts to set the state to reflect a release in shared mode. 796: * 797: * <p>This method is always invoked by the thread performing release. 798: * 799: * <p>The default implementation throws 800: * {@link UnsupportedOperationException}. 801: * 802: * @param arg the release argument. This value is always the one 803: * passed to a release method, or the current state value upon 804: * entry to a condition wait. The value is otherwise 805: * uninterpreted and can represent anything you like. 806: * @return {@code true} if this release of shared mode may permit a 807: * waiting acquire (shared or exclusive) to succeed; and 808: * {@code false} otherwise 809: * @throws IllegalMonitorStateException if releasing would place this 810: * synchronizer in an illegal state. This exception must be 811: * thrown in a consistent fashion for synchronization to work 812: * correctly. 813: * @throws UnsupportedOperationException if shared mode is not supported 814: */ 815: protected boolean tryReleaseShared(long arg) { 816: throw new UnsupportedOperationException(); 817: } 818: 819: /** 820: * Returns {@code true} if synchronization is held exclusively with 821: * respect to the current (calling) thread. This method is invoked 822: * upon each call to a non-waiting {@link ConditionObject} method. 823: * (Waiting methods instead invoke {@link #release}.) 824: * 825: * <p>The default implementation throws {@link 826: * UnsupportedOperationException}. This method is invoked 827: * internally only within {@link ConditionObject} methods, so need 828: * not be defined if conditions are not used. 829: * 830: * @return {@code true} if synchronization is held exclusively; 831: * {@code false} otherwise 832: * @throws UnsupportedOperationException if conditions are not supported 833: */ 834: protected boolean isHeldExclusively() { 835: throw new UnsupportedOperationException(); 836: } 837: 838: /** 839: * Acquires in exclusive mode, ignoring interrupts. Implemented 840: * by invoking at least once {@link #tryAcquire}, 841: * returning on success. Otherwise the thread is queued, possibly 842: * repeatedly blocking and unblocking, invoking {@link 843: * #tryAcquire} until success. This method can be used 844: * to implement method {@link Lock#lock}. 845: * 846: * @param arg the acquire argument. This value is conveyed to 847: * {@link #tryAcquire} but is otherwise uninterpreted and 848: * can represent anything you like. 849: */ 850: public final void acquire(long arg) { 851: if (!tryAcquire(arg) && 852: acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 853: selfInterrupt(); 854: } 855: 856: /** 857: * Acquires in exclusive mode, aborting if interrupted. 858: * Implemented by first checking interrupt status, then invoking 859: * at least once {@link #tryAcquire}, returning on 860: * success. Otherwise the thread is queued, possibly repeatedly 861: * blocking and unblocking, invoking {@link #tryAcquire} 862: * until success or the thread is interrupted. This method can be 863: * used to implement method {@link Lock#lockInterruptibly}. 864: * 865: * @param arg the acquire argument. This value is conveyed to 866: * {@link #tryAcquire} but is otherwise uninterpreted and 867: * can represent anything you like. 868: * @throws InterruptedException if the current thread is interrupted 869: */ 870: public final void acquireInterruptibly(long arg) throws InterruptedException { 871: if (Thread.interrupted()) 872: throw new InterruptedException(); 873: if (!tryAcquire(arg)) 874: doAcquireInterruptibly(arg); 875: } 876: 877: /** 878: * Attempts to acquire in exclusive mode, aborting if interrupted, 879: * and failing if the given timeout elapses. Implemented by first 880: * checking interrupt status, then invoking at least once {@link 881: * #tryAcquire}, returning on success. Otherwise, the thread is 882: * queued, possibly repeatedly blocking and unblocking, invoking 883: * {@link #tryAcquire} until success or the thread is interrupted 884: * or the timeout elapses. This method can be used to implement 885: * method {@link Lock#tryLock(long, TimeUnit)}. 886: * 887: * @param arg the acquire argument. This value is conveyed to 888: * {@link #tryAcquire} but is otherwise uninterpreted and 889: * can represent anything you like. 890: * @param nanosTimeout the maximum number of nanoseconds to wait 891: * @return {@code true} if acquired; {@code false} if timed out 892: * @throws InterruptedException if the current thread is interrupted 893: */ 894: public final boolean tryAcquireNanos(long arg, long nanosTimeout) throws InterruptedException { 895: if (Thread.interrupted()) 896: throw new InterruptedException(); 897: return tryAcquire(arg) || 898: doAcquireNanos(arg, nanosTimeout); 899: } 900: 901: /** 902: * Releases in exclusive mode. Implemented by unblocking one or 903: * more threads if {@link #tryRelease} returns true. 904: * This method can be used to implement method {@link Lock#unlock}. 905: * 906: * @param arg the release argument. This value is conveyed to 907: * {@link #tryRelease} but is otherwise uninterpreted and 908: * can represent anything you like. 909: * @return the value returned from {@link #tryRelease} 910: */ 911: public final boolean release(long arg) { 912: if (tryRelease(arg)) { 913: Node h = head; 914: if (h != null && h.waitStatus != 0) 915: unparkSuccessor(h); 916: return true; 917: } 918: return false; 919: } 920: 921: /** 922: * Acquires in shared mode, ignoring interrupts. Implemented by 923: * first invoking at least once {@link #tryAcquireShared}, 924: * returning on success. Otherwise the thread is queued, possibly 925: * repeatedly blocking and unblocking, invoking {@link 926: * #tryAcquireShared} until success. 927: * 928: * @param arg the acquire argument. This value is conveyed to 929: * {@link #tryAcquireShared} but is otherwise uninterpreted 930: * and can represent anything you like. 931: */ 932: public final void acquireShared(long arg) { 933: if (tryAcquireShared(arg) < 0) 934: doAcquireShared(arg); 935: } 936: 937: /** 938: * Acquires in shared mode, aborting if interrupted. Implemented 939: * by first checking interrupt status, then invoking at least once 940: * {@link #tryAcquireShared}, returning on success. Otherwise the 941: * thread is queued, possibly repeatedly blocking and unblocking, 942: * invoking {@link #tryAcquireShared} until success or the thread 943: * is interrupted. 944: * @param arg the acquire argument. 945: * This value is conveyed to {@link #tryAcquireShared} but is 946: * otherwise uninterpreted and can represent anything 947: * you like. 948: * @throws InterruptedException if the current thread is interrupted 949: */ 950: public final void acquireSharedInterruptibly(long arg) throws InterruptedException { 951: if (Thread.interrupted()) 952: throw new InterruptedException(); 953: if (tryAcquireShared(arg) < 0) 954: doAcquireSharedInterruptibly(arg); 955: } 956: 957: /** 958: * Attempts to acquire in shared mode, aborting if interrupted, and 959: * failing if the given timeout elapses. Implemented by first 960: * checking interrupt status, then invoking at least once {@link 961: * #tryAcquireShared}, returning on success. Otherwise, the 962: * thread is queued, possibly repeatedly blocking and unblocking, 963: * invoking {@link #tryAcquireShared} until success or the thread 964: * is interrupted or the timeout elapses. 965: * 966: * @param arg the acquire argument. This value is conveyed to 967: * {@link #tryAcquireShared} but is otherwise uninterpreted 968: * and can represent anything you like. 969: * @param nanosTimeout the maximum number of nanoseconds to wait 970: * @return {@code true} if acquired; {@code false} if timed out 971: * @throws InterruptedException if the current thread is interrupted 972: */ 973: public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException { 974: if (Thread.interrupted()) 975: throw new InterruptedException(); 976: return tryAcquireShared(arg) >= 0 || 977: doAcquireSharedNanos(arg, nanosTimeout); 978: } 979: 980: /** 981: * Releases in shared mode. Implemented by unblocking one or more 982: * threads if {@link #tryReleaseShared} returns true. 983: * 984: * @param arg the release argument. This value is conveyed to 985: * {@link #tryReleaseShared} but is otherwise uninterpreted 986: * and can represent anything you like. 987: * @return the value returned from {@link #tryReleaseShared} 988: */ 989: public final boolean releaseShared(long arg) { 990: if (tryReleaseShared(arg)) { 991: Node h = head; 992: if (h != null && h.waitStatus != 0) 993: unparkSuccessor(h); 994: return true; 995: } 996: return false; 997: } 998: 999: // Queue inspection methods 1000: 1001: /** 1002: * Queries whether any threads are waiting to acquire. Note that 1003: * because cancellations due to interrupts and timeouts may occur 1004: * at any time, a {@code true} return does not guarantee that any 1005: * other thread will ever acquire. 1006: * 1007: * <p>In this implementation, this operation returns in 1008: * constant time. 1009: * 1010: * @return {@code true} if there may be other threads waiting to acquire 1011: */ 1012: public final boolean hasQueuedThreads() { 1013: return head != tail; 1014: } 1015: 1016: /** 1017: * Queries whether any threads have ever contended to acquire this 1018: * synchronizer; that is if an acquire method has ever blocked. 1019: * 1020: * <p>In this implementation, this operation returns in 1021: * constant time. 1022: * 1023: * @return {@code true} if there has ever been contention 1024: */ 1025: public final boolean hasContended() { 1026: return head != null; 1027: } 1028: 1029: /** 1030: * Returns the first (longest-waiting) thread in the queue, or 1031: * {@code null} if no threads are currently queued. 1032: * 1033: * <p>In this implementation, this operation normally returns in 1034: * constant time, but may iterate upon contention if other threads are 1035: * concurrently modifying the queue. 1036: * 1037: * @return the first (longest-waiting) thread in the queue, or 1038: * {@code null} if no threads are currently queued 1039: */ 1040: public final Thread getFirstQueuedThread() { 1041: // handle only fast path, else relay 1042: return (head == tail)? null : fullGetFirstQueuedThread(); 1043: } 1044: 1045: /** 1046: * Version of getFirstQueuedThread called when fastpath fails 1047: */ 1048: private Thread fullGetFirstQueuedThread() { 1049: /* 1050: * The first node is normally h.next. Try to get its 1051: * thread field, ensuring consistent reads: If thread 1052: * field is nulled out or s.prev is no longer head, then 1053: * some other thread(s) concurrently performed setHead in 1054: * between some of our reads. We try this twice before 1055: * resorting to traversal. 1056: */ 1057: Node h, s; 1058: Thread st; 1059: if (((h = head) != null && (s = h.next) != null && 1060: s.prev == head && (st = s.thread) != null) || 1061: ((h = head) != null && (s = h.next) != null && 1062: s.prev == head && (st = s.thread) != null)) 1063: return st; 1064: 1065: /* 1066: * Head's next field might not have been set yet, or may have 1067: * been unset after setHead. So we must check to see if tail 1068: * is actually first node. If not, we continue on, safely 1069: * traversing from tail back to head to find first, 1070: * guaranteeing termination. 1071: */ 1072: 1073: Node t = tail; 1074: Thread firstThread = null; 1075: while (t != null && t != head) { 1076: Thread tt = t.thread; 1077: if (tt != null) 1078: firstThread = tt; 1079: t = t.prev; 1080: } 1081: return firstThread; 1082: } 1083: 1084: /** 1085: * Returns true if the given thread is currently queued. 1086: * 1087: * <p>This implementation traverses the queue to determine 1088: * presence of the given thread. 1089: * 1090: * @param thread the thread 1091: * @return {@code true} if the given thread is on the queue 1092: * @throws NullPointerException if the thread is null 1093: */ 1094: public final boolean isQueued(Thread thread) { 1095: if (thread == null) 1096: throw new NullPointerException(); 1097: for (Node p = tail; p != null; p = p.prev) 1098: if (p.thread == thread) 1099: return true; 1100: return false; 1101: } 1102: 1103: /** 1104: * Return {@code true} if the apparent first queued thread, if one 1105: * exists, is not waiting in exclusive mode. Used only as a heuristic 1106: * in ReentrantReadWriteLock. 1107: */ 1108: final boolean apparentlyFirstQueuedIsExclusive() { 1109: Node h, s; 1110: return ((h = head) != null && (s = h.next) != null && 1111: s.nextWaiter != Node.SHARED); 1112: } 1113: 1114: /** 1115: * Return {@code true} if the queue is empty or if the given thread 1116: * is at the head of the queue. This is reliable only if 1117: * <tt>current</tt> is actually Thread.currentThread() of caller. 1118: */ 1119: final boolean isFirst(Thread current) { 1120: Node h, s; 1121: return ((h = head) == null || 1122: ((s = h.next) != null && s.thread == current) || 1123: fullIsFirst(current)); 1124: } 1125: 1126: final boolean fullIsFirst(Thread current) { 1127: // same idea as fullGetFirstQueuedThread 1128: Node h, s; 1129: Thread firstThread = null; 1130: if (((h = head) != null && (s = h.next) != null && 1131: s.prev == head && (firstThread = s.thread) != null)) 1132: return firstThread == current; 1133: Node t = tail; 1134: while (t != null && t != head) { 1135: Thread tt = t.thread; 1136: if (tt != null) 1137: firstThread = tt; 1138: t = t.prev; 1139: } 1140: return firstThread == current || firstThread == null; 1141: } 1142: 1143: 1144: // Instrumentation and monitoring methods 1145: 1146: /** 1147: * Returns an estimate of the number of threads waiting to 1148: * acquire. The value is only an estimate because the number of 1149: * threads may change dynamically while this method traverses 1150: * internal data structures. This method is designed for use in 1151: * monitoring system state, not for synchronization 1152: * control. 1153: * 1154: * @return the estimated number of threads waiting to acquire 1155: */ 1156: public final int getQueueLength() { 1157: int n = 0; 1158: for (Node p = tail; p != null; p = p.prev) { 1159: if (p.thread != null) 1160: ++n; 1161: } 1162: return n; 1163: } 1164: 1165: /** 1166: * Returns a collection containing threads that may be waiting to 1167: * acquire. Because the actual set of threads may change 1168: * dynamically while constructing this result, the returned 1169: * collection is only a best-effort estimate. The elements of the 1170: * returned collection are in no particular order. This method is 1171: * designed to facilitate construction of subclasses that provide 1172: * more extensive monitoring facilities. 1173: * 1174: * @return the collection of threads 1175: */ 1176: public final Collection<Thread> getQueuedThreads() { 1177: ArrayList<Thread> list = new ArrayList<Thread>(); 1178: for (Node p = tail; p != null; p = p.prev) { 1179: Thread t = p.thread; 1180: if (t != null) 1181: list.add(t); 1182: } 1183: return list; 1184: } 1185: 1186: /** 1187: * Returns a collection containing threads that may be waiting to 1188: * acquire in exclusive mode. This has the same properties 1189: * as {@link #getQueuedThreads} except that it only returns 1190: * those threads waiting due to an exclusive acquire. 1191: * 1192: * @return the collection of threads 1193: */ 1194: public final Collection<Thread> getExclusiveQueuedThreads() { 1195: ArrayList<Thread> list = new ArrayList<Thread>(); 1196: for (Node p = tail; p != null; p = p.prev) { 1197: if (!p.isShared()) { 1198: Thread t = p.thread; 1199: if (t != null) 1200: list.add(t); 1201: } 1202: } 1203: return list; 1204: } 1205: 1206: /** 1207: * Returns a collection containing threads that may be waiting to 1208: * acquire in shared mode. This has the same properties 1209: * as {@link #getQueuedThreads} except that it only returns 1210: * those threads waiting due to a shared acquire. 1211: * 1212: * @return the collection of threads 1213: */ 1214: public final Collection<Thread> getSharedQueuedThreads() { 1215: ArrayList<Thread> list = new ArrayList<Thread>(); 1216: for (Node p = tail; p != null; p = p.prev) { 1217: if (p.isShared()) { 1218: Thread t = p.thread; 1219: if (t != null) 1220: list.add(t); 1221: } 1222: } 1223: return list; 1224: } 1225: 1226: /** 1227: * Returns a string identifying this synchronizer, as well as its state. 1228: * The state, in brackets, includes the String {@code "State ="} 1229: * followed by the current value of {@link #getState}, and either 1230: * {@code "nonempty"} or {@code "empty"} depending on whether the 1231: * queue is empty. 1232: * 1233: * @return a string identifying this synchronizer, as well as its state 1234: */ 1235: public String toString() { 1236: long s = getState(); 1237: String q = hasQueuedThreads()? "non" : ""; 1238: return super.toString() + 1239: "[State = " + s + ", " + q + "empty queue]"; 1240: } 1241: 1242: 1243: // Internal support methods for Conditions 1244: 1245: /** 1246: * Returns true if a node, always one that was initially placed on 1247: * a condition queue, is now waiting to reacquire on sync queue. 1248: * @param node the node 1249: * @return true if is reacquiring 1250: */ 1251: final boolean isOnSyncQueue(Node node) { 1252: if (node.waitStatus == Node.CONDITION || node.prev == null) 1253: return false; 1254: if (node.next != null) // If has successor, it must be on queue 1255: return true; 1256: /* 1257: * node.prev can be non-null, but not yet on queue because 1258: * the CAS to place it on queue can fail. So we have to 1259: * traverse from tail to make sure it actually made it. It 1260: * will always be near the tail in calls to this method, and 1261: * unless the CAS failed (which is unlikely), it will be 1262: * there, so we hardly ever traverse much. 1263: */ 1264: return findNodeFromTail(node); 1265: } 1266: 1267: /** 1268: * Returns true if node is on sync queue by searching backwards from tail. 1269: * Called only when needed by isOnSyncQueue. 1270: * @return true if present 1271: */ 1272: private boolean findNodeFromTail(Node node) { 1273: Node t = tail; 1274: for (;;) { 1275: if (t == node) 1276: return true; 1277: if (t == null) 1278: return false; 1279: t = t.prev; 1280: } 1281: } 1282: 1283: /** 1284: * Transfers a node from a condition queue onto sync queue. 1285: * Returns true if successful. 1286: * @param node the node 1287: * @return true if successfully transferred (else the node was 1288: * cancelled before signal). 1289: */ 1290: final boolean transferForSignal(Node node) { 1291: /* 1292: * If cannot change waitStatus, the node has been cancelled. 1293: */ 1294: if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 1295: return false; 1296: 1297: /* 1298: * Splice onto queue and try to set waitStatus of predecessor to 1299: * indicate that thread is (probably) waiting. If cancelled or 1300: * attempt to set waitStatus fails, wake up to resync (in which 1301: * case the waitStatus can be transiently and harmlessly wrong). 1302: */ 1303: Node p = enq(node); 1304: int c = p.waitStatus; 1305: if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) 1306: LockSupport.unpark(node.thread); 1307: return true; 1308: } 1309: 1310: /** 1311: * Transfers node, if necessary, to sync queue after a cancelled 1312: * wait. Returns true if thread was cancelled before being 1313: * signalled. 1314: * @param current the waiting thread 1315: * @param node its node 1316: * @return true if cancelled before the node was signalled. 1317: */ 1318: final boolean transferAfterCancelledWait(Node node) { 1319: if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 1320: enq(node); 1321: return true; 1322: } 1323: /* 1324: * If we lost out to a signal(), then we can't proceed 1325: * until it finishes its enq(). Cancelling during an 1326: * incomplete transfer is both rare and transient, so just 1327: * spin. 1328: */ 1329: while (!isOnSyncQueue(node)) 1330: Thread.yield(); 1331: return false; 1332: } 1333: 1334: /** 1335: * Invokes release with current state value; returns saved state. 1336: * Cancels node and throws exception on failure. 1337: * @param node the condition node for this wait 1338: * @return previous sync state 1339: */ 1340: final long fullyRelease(Node node) { 1341: try { 1342: long savedState = getState(); 1343: if (release(savedState)) 1344: return savedState; 1345: } catch (RuntimeException ex) { 1346: node.waitStatus = Node.CANCELLED; 1347: throw ex; 1348: } 1349: // reach here if release fails 1350: node.waitStatus = Node.CANCELLED; 1351: throw new IllegalMonitorStateException(); 1352: } 1353: 1354: // Instrumentation methods for conditions 1355: 1356: /** 1357: * Queries whether the given ConditionObject 1358: * uses this synchronizer as its lock. 1359: * 1360: * @param condition the condition 1361: * @return <tt>true</tt> if owned 1362: * @throws NullPointerException if the condition is null 1363: */ 1364: public final boolean owns(ConditionObject condition) { 1365: if (condition == null) 1366: throw new NullPointerException(); 1367: return condition.isOwnedBy(this); 1368: } 1369: 1370: /** 1371: * Queries whether any threads are waiting on the given condition 1372: * associated with this synchronizer. Note that because timeouts 1373: * and interrupts may occur at any time, a <tt>true</tt> return 1374: * does not guarantee that a future <tt>signal</tt> will awaken 1375: * any threads. This method is designed primarily for use in 1376: * monitoring of the system state. 1377: * 1378: * @param condition the condition 1379: * @return <tt>true</tt> if there are any waiting threads 1380: * @throws IllegalMonitorStateException if exclusive synchronization 1381: * is not held 1382: * @throws IllegalArgumentException if the given condition is 1383: * not associated with this synchronizer 1384: * @throws NullPointerException if the condition is null 1385: */ 1386: public final boolean hasWaiters(ConditionObject condition) { 1387: if (!owns(condition)) 1388: throw new IllegalArgumentException("Not owner"); 1389: return condition.hasWaiters(); 1390: } 1391: 1392: /** 1393: * Returns an estimate of the number of threads waiting on the 1394: * given condition associated with this synchronizer. Note that 1395: * because timeouts and interrupts may occur at any time, the 1396: * estimate serves only as an upper bound on the actual number of 1397: * waiters. This method is designed for use in monitoring of the 1398: * system state, not for synchronization control. 1399: * 1400: * @param condition the condition 1401: * @return the estimated number of waiting threads 1402: * @throws IllegalMonitorStateException if exclusive synchronization 1403: * is not held 1404: * @throws IllegalArgumentException if the given condition is 1405: * not associated with this synchronizer 1406: * @throws NullPointerException if the condition is null 1407: */ 1408: public final int getWaitQueueLength(ConditionObject condition) { 1409: if (!owns(condition)) 1410: throw new IllegalArgumentException("Not owner"); 1411: return condition.getWaitQueueLength(); 1412: } 1413: 1414: /** 1415: * Returns a collection containing those threads that may be 1416: * waiting on the given condition associated with this 1417: * synchronizer. Because the actual set of threads may change 1418: * dynamically while constructing this result, the returned 1419: * collection is only a best-effort estimate. The elements of the 1420: * returned collection are in no particular order. 1421: * 1422: * @param condition the condition 1423: * @return the collection of threads 1424: * @throws IllegalMonitorStateException if exclusive synchronization 1425: * is not held 1426: * @throws IllegalArgumentException if the given condition is 1427: * not associated with this synchronizer 1428: * @throws NullPointerException if the condition is null 1429: */ 1430: public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1431: if (!owns(condition)) 1432: throw new IllegalArgumentException("Not owner"); 1433: return condition.getWaitingThreads(); 1434: } 1435: 1436: /** 1437: * Condition implementation for a {@link 1438: * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1439: * Lock} implementation. 1440: * 1441: * <p>Method documentation for this class describes mechanics, 1442: * not behavioral specifications from the point of view of Lock 1443: * and Condition users. Exported versions of this class will in 1444: * general need to be accompanied by documentation describing 1445: * condition semantics that rely on those of the associated 1446: * <tt>AbstractQueuedLongSynchronizer</tt>. 1447: * 1448: * <p>This class is Serializable, but all fields are transient, 1449: * so deserialized conditions have no waiters. 1450: * 1451: * @since 1.6 1452: */ 1453: public class ConditionObject implements Condition, java.io.Serializable { 1454: private static final long serialVersionUID = 1173984872572414699L; 1455: /** First node of condition queue. */ 1456: private transient Node firstWaiter; 1457: /** Last node of condition queue. */ 1458: private transient Node lastWaiter; 1459: 1460: /** 1461: * Creates a new <tt>ConditionObject</tt> instance. 1462: */ 1463: public ConditionObject() { } 1464: 1465: // Internal methods 1466: 1467: /** 1468: * Adds a new waiter to wait queue. 1469: * @return its new wait node 1470: */ 1471: private Node addConditionWaiter() { 1472: Node node = new Node(Thread.currentThread(), Node.CONDITION); 1473: Node t = lastWaiter; 1474: if (t == null) 1475: firstWaiter = node; 1476: else 1477: t.nextWaiter = node; 1478: lastWaiter = node; 1479: return node; 1480: } 1481: 1482: /** 1483: * Removes and transfers nodes until hit non-cancelled one or 1484: * null. Split out from signal in part to encourage compilers 1485: * to inline the case of no waiters. 1486: * @param first (non-null) the first node on condition queue 1487: */ 1488: private void doSignal(Node first) { 1489: do { 1490: if ( (firstWaiter = first.nextWaiter) == null) 1491: lastWaiter = null; 1492: first.nextWaiter = null; 1493: } while (!transferForSignal(first) && 1494: (first = firstWaiter) != null); 1495: } 1496: 1497: /** 1498: * Removes and transfers all nodes. 1499: * @param first (non-null) the first node on condition queue 1500: */ 1501: private void doSignalAll(Node first) { 1502: lastWaiter = firstWaiter = null; 1503: do { 1504: Node next = first.nextWaiter; 1505: first.nextWaiter = null; 1506: transferForSignal(first); 1507: first = next; 1508: } while (first != null); 1509: } 1510: 1511: /** 1512: * Returns true if given node is on this condition queue. 1513: * Call only when holding lock. 1514: */ 1515: private boolean isOnConditionQueue(Node node) { 1516: return node.next != null || node == lastWaiter; 1517: } 1518: 1519: /** 1520: * Unlinks a cancelled waiter node from condition queue. This 1521: * is called when cancellation occurred during condition wait, 1522: * not lock wait, and is called only after lock has been 1523: * re-acquired by a cancelled waiter and the node is not known 1524: * to already have been dequeued. It is needed to avoid 1525: * garbage retention in the absence of signals. So even though 1526: * it may require a full traversal, it comes into play only 1527: * when timeouts or cancellations occur in the absence of 1528: * signals. 1529: */ 1530: private void unlinkCancelledWaiter(Node node) { 1531: Node t = firstWaiter; 1532: Node trail = null; 1533: while (t != null) { 1534: if (t == node) { 1535: Node next = t.nextWaiter; 1536: if (trail == null) 1537: firstWaiter = next; 1538: else 1539: trail.nextWaiter = next; 1540: if (lastWaiter == node) 1541: lastWaiter = trail; 1542: break; 1543: } 1544: trail = t; 1545: t = t.nextWaiter; 1546: } 1547: } 1548: 1549: // public methods 1550: 1551: /** 1552: * Moves the longest-waiting thread, if one exists, from the 1553: * wait queue for this condition to the wait queue for the 1554: * owning lock. 1555: * 1556: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1557: * returns {@code false} 1558: */ 1559: public final void signal() { 1560: if (!isHeldExclusively()) 1561: throw new IllegalMonitorStateException(); 1562: Node first = firstWaiter; 1563: if (first != null) 1564: doSignal(first); 1565: } 1566: 1567: /** 1568: * Moves all threads from the wait queue for this condition to 1569: * the wait queue for the owning lock. 1570: * 1571: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1572: * returns {@code false} 1573: */ 1574: public final void signalAll() { 1575: if (!isHeldExclusively()) 1576: throw new IllegalMonitorStateException(); 1577: Node first = firstWaiter; 1578: if (first != null) 1579: doSignalAll(first); 1580: } 1581: 1582: /** 1583: * Implements uninterruptible condition wait. 1584: * <ol> 1585: * <li> Save lock state returned by {@link #getState} 1586: * <li> Invoke {@link #release} with 1587: * saved state as argument, throwing 1588: * IllegalMonitorStateException if it fails. 1589: * <li> Block until signalled 1590: * <li> Reacquire by invoking specialized version of 1591: * {@link #acquire} with saved state as argument. 1592: * </ol> 1593: */ 1594: public final void awaitUninterruptibly() { 1595: Node node = addConditionWaiter(); 1596: long savedState = fullyRelease(node); 1597: boolean interrupted = false; 1598: while (!isOnSyncQueue(node)) { 1599: LockSupport.park(this); 1600: if (Thread.interrupted()) 1601: interrupted = true; 1602: } 1603: if (acquireQueued(node, savedState) || interrupted) 1604: selfInterrupt(); 1605: } 1606: 1607: /* 1608: * For interruptible waits, we need to track whether to throw 1609: * InterruptedException, if interrupted while blocked on 1610: * condition, versus reinterrupt current thread, if 1611: * interrupted while blocked waiting to re-acquire. 1612: */ 1613: 1614: /** Mode meaning to reinterrupt on exit from wait */ 1615: private static final int REINTERRUPT = 1; 1616: /** Mode meaning to throw InterruptedException on exit from wait */ 1617: private static final int THROW_IE = -1; 1618: 1619: /** 1620: * Checks for interrupt, returning THROW_IE if interrupted 1621: * before signalled, REINTERRUPT if after signalled, or 1622: * 0 if not interrupted. 1623: */ 1624: private int checkInterruptWhileWaiting(Node node) { 1625: return (Thread.interrupted()) ? 1626: ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) : 1627: 0; 1628: } 1629: 1630: /** 1631: * Throws InterruptedException, reinterrupts current thread, or 1632: * does nothing, depending on mode. 1633: */ 1634: private void reportInterruptAfterWait(int interruptMode) 1635: throws InterruptedException { 1636: if (interruptMode == THROW_IE) 1637: throw new InterruptedException(); 1638: else if (interruptMode == REINTERRUPT) 1639: selfInterrupt(); 1640: } 1641: 1642: /** 1643: * Implements interruptible condition wait. 1644: * <ol> 1645: * <li> If current thread is interrupted, throw InterruptedException 1646: * <li> Save lock state returned by {@link #getState} 1647: * <li> Invoke {@link #release} with 1648: * saved state as argument, throwing 1649: * IllegalMonitorStateException if it fails. 1650: * <li> Block until signalled or interrupted 1651: * <li> Reacquire by invoking specialized version of 1652: * {@link #acquire} with saved state as argument. 1653: * <li> If interrupted while blocked in step 4, throw exception 1654: * </ol> 1655: */ 1656: public final void await() throws InterruptedException { 1657: if (Thread.interrupted()) 1658: throw new InterruptedException(); 1659: Node node = addConditionWaiter(); 1660: long savedState = fullyRelease(node); 1661: int interruptMode = 0; 1662: while (!isOnSyncQueue(node)) { 1663: LockSupport.park(this); 1664: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1665: break; 1666: } 1667: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1668: interruptMode = REINTERRUPT; 1669: if (isOnConditionQueue(node)) 1670: unlinkCancelledWaiter(node); 1671: if (interruptMode != 0) 1672: reportInterruptAfterWait(interruptMode); 1673: } 1674: 1675: /** 1676: * Implements timed condition wait. 1677: * <ol> 1678: * <li> If current thread is interrupted, throw InterruptedException 1679: * <li> Save lock state returned by {@link #getState} 1680: * <li> Invoke {@link #release} with 1681: * saved state as argument, throwing 1682: * IllegalMonitorStateException if it fails. 1683: * <li> Block until signalled, interrupted, or timed out 1684: * <li> Reacquire by invoking specialized version of 1685: * {@link #acquire} with saved state as argument. 1686: * <li> If interrupted while blocked in step 4, throw InterruptedException 1687: * </ol> 1688: */ 1689: public final long awaitNanos(long nanosTimeout) throws InterruptedException { 1690: if (Thread.interrupted()) 1691: throw new InterruptedException(); 1692: Node node = addConditionWaiter(); 1693: long savedState = fullyRelease(node); 1694: long lastTime = System.nanoTime(); 1695: int interruptMode = 0; 1696: while (!isOnSyncQueue(node)) { 1697: if (nanosTimeout <= 0L) { 1698: transferAfterCancelledWait(node); 1699: break; 1700: } 1701: LockSupport.parkNanos(this, nanosTimeout); 1702: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1703: break; 1704: 1705: long now = System.nanoTime(); 1706: nanosTimeout -= now - lastTime; 1707: lastTime = now; 1708: } 1709: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1710: interruptMode = REINTERRUPT; 1711: if (isOnConditionQueue(node)) 1712: unlinkCancelledWaiter(node); 1713: if (interruptMode != 0) 1714: reportInterruptAfterWait(interruptMode); 1715: return nanosTimeout - (System.nanoTime() - lastTime); 1716: } 1717: 1718: /** 1719: * Implements absolute timed condition wait. 1720: * <ol> 1721: * <li> If current thread is interrupted, throw InterruptedException 1722: * <li> Save lock state returned by {@link #getState} 1723: * <li> Invoke {@link #release} with 1724: * saved state as argument, throwing 1725: * IllegalMonitorStateException if it fails. 1726: * <li> Block until signalled, interrupted, or timed out 1727: * <li> Reacquire by invoking specialized version of 1728: * {@link #acquire} with saved state as argument. 1729: * <li> If interrupted while blocked in step 4, throw InterruptedException 1730: * <li> If timed out while blocked in step 4, return false, else true 1731: * </ol> 1732: */ 1733: public final boolean awaitUntil(Date deadline) throws InterruptedException { 1734: if (deadline == null) 1735: throw new NullPointerException(); 1736: long abstime = deadline.getTime(); 1737: if (Thread.interrupted()) 1738: throw new InterruptedException(); 1739: Node node = addConditionWaiter(); 1740: long savedState = fullyRelease(node); 1741: boolean timedout = false; 1742: int interruptMode = 0; 1743: while (!isOnSyncQueue(node)) { 1744: if (System.currentTimeMillis() > abstime) { 1745: timedout = transferAfterCancelledWait(node); 1746: break; 1747: } 1748: LockSupport.parkUntil(this, abstime); 1749: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1750: break; 1751: } 1752: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1753: interruptMode = REINTERRUPT; 1754: if (isOnConditionQueue(node)) 1755: unlinkCancelledWaiter(node); 1756: if (interruptMode != 0) 1757: reportInterruptAfterWait(interruptMode); 1758: return !timedout; 1759: } 1760: 1761: /** 1762: * Implements timed condition wait. 1763: * <ol> 1764: * <li> If current thread is interrupted, throw InterruptedException 1765: * <li> Save lock state returned by {@link #getState} 1766: * <li> Invoke {@link #release} with 1767: * saved state as argument, throwing 1768: * IllegalMonitorStateException if it fails. 1769: * <li> Block until signalled, interrupted, or timed out 1770: * <li> Reacquire by invoking specialized version of 1771: * {@link #acquire} with saved state as argument. 1772: * <li> If interrupted while blocked in step 4, throw InterruptedException 1773: * <li> If timed out while blocked in step 4, return false, else true 1774: * </ol> 1775: */ 1776: public final boolean await(long time, TimeUnit unit) throws InterruptedException { 1777: if (unit == null) 1778: throw new NullPointerException(); 1779: long nanosTimeout = unit.toNanos(time); 1780: if (Thread.interrupted()) 1781: throw new InterruptedException(); 1782: Node node = addConditionWaiter(); 1783: long savedState = fullyRelease(node); 1784: long lastTime = System.nanoTime(); 1785: boolean timedout = false; 1786: int interruptMode = 0; 1787: while (!isOnSyncQueue(node)) { 1788: if (nanosTimeout <= 0L) { 1789: timedout = transferAfterCancelledWait(node); 1790: break; 1791: } 1792: LockSupport.parkNanos(this, nanosTimeout); 1793: if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1794: break; 1795: long now = System.nanoTime(); 1796: nanosTimeout -= now - lastTime; 1797: lastTime = now; 1798: } 1799: if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1800: interruptMode = REINTERRUPT; 1801: if (isOnConditionQueue(node)) 1802: unlinkCancelledWaiter(node); 1803: if (interruptMode != 0) 1804: reportInterruptAfterWait(interruptMode); 1805: return !timedout; 1806: } 1807: 1808: // support for instrumentation 1809: 1810: /** 1811: * Returns true if this condition was created by the given 1812: * synchronization object. 1813: * 1814: * @return {@code true} if owned 1815: */ 1816: final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1817: return sync == AbstractQueuedLongSynchronizer.this; 1818: } 1819: 1820: /** 1821: * Queries whether any threads are waiting on this condition. 1822: * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters}. 1823: * 1824: * @return {@code true} if there are any waiting threads 1825: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1826: * returns {@code false} 1827: */ 1828: protected final boolean hasWaiters() { 1829: if (!isHeldExclusively()) 1830: throw new IllegalMonitorStateException(); 1831: for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1832: if (w.waitStatus == Node.CONDITION) 1833: return true; 1834: } 1835: return false; 1836: } 1837: 1838: /** 1839: * Returns an estimate of the number of threads waiting on 1840: * this condition. 1841: * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength}. 1842: * 1843: * @return the estimated number of waiting threads 1844: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1845: * returns {@code false} 1846: */ 1847: protected final int getWaitQueueLength() { 1848: if (!isHeldExclusively()) 1849: throw new IllegalMonitorStateException(); 1850: int n = 0; 1851: for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1852: if (w.waitStatus == Node.CONDITION) 1853: ++n; 1854: } 1855: return n; 1856: } 1857: 1858: /** 1859: * Returns a collection containing those threads that may be 1860: * waiting on this Condition. 1861: * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads}. 1862: * 1863: * @return the collection of threads 1864: * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1865: * returns {@code false} 1866: */ 1867: protected final Collection<Thread> getWaitingThreads() { 1868: if (!isHeldExclusively()) 1869: throw new IllegalMonitorStateException(); 1870: ArrayList<Thread> list = new ArrayList<Thread>(); 1871: for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1872: if (w.waitStatus == Node.CONDITION) { 1873: Thread t = w.thread; 1874: if (t != null) 1875: list.add(t); 1876: } 1877: } 1878: return list; 1879: } 1880: } 1881: 1882: /** 1883: * Setup to support compareAndSet. We need to natively implement 1884: * this here: For the sake of permitting future enhancements, we 1885: * cannot explicitly subclass AtomicLong, which would be 1886: * efficient and useful otherwise. So, as the lesser of evils, we 1887: * natively implement using hotspot intrinsics API. And while we 1888: * are at it, we do the same for other CASable fields (which could 1889: * otherwise be done with atomic field updaters). 1890: */ 1891: private static final Unsafe unsafe = Unsafe.getUnsafe(); 1892: private static final long stateOffset; 1893: private static final long headOffset; 1894: private static final long tailOffset; 1895: private static final long waitStatusOffset; 1896: 1897: static { 1898: try { 1899: stateOffset = unsafe.objectFieldOffset 1900: (AbstractQueuedLongSynchronizer.class.getDeclaredField("state")); 1901: headOffset = unsafe.objectFieldOffset 1902: (AbstractQueuedLongSynchronizer.class.getDeclaredField("head")); 1903: tailOffset = unsafe.objectFieldOffset 1904: (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail")); 1905: waitStatusOffset = unsafe.objectFieldOffset 1906: (Node.class.getDeclaredField("waitStatus")); 1907: 1908: } catch (Exception ex) { throw new Error(ex); } 1909: } 1910: 1911: /** 1912: * CAS head field. Used only by enq 1913: */ 1914: private final boolean compareAndSetHead(Node update) { 1915: return unsafe.compareAndSwapObject(this, headOffset, null, update); 1916: } 1917: 1918: /** 1919: * CAS tail field. Used only by enq 1920: */ 1921: private final boolean compareAndSetTail(Node expect, Node update) { 1922: return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 1923: } 1924: 1925: /** 1926: * CAS waitStatus field of a node. 1927: */ 1928: private final static boolean compareAndSetWaitStatus(Node node, 1929: int expect, 1930: int update) { 1931: return unsafe.compareAndSwapInt(node, waitStatusOffset, 1932: expect, update); 1933: } 1934: }
GNU Classpath (0.98) |