GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: import java.util.concurrent.locks.*; 9: import java.util.*; 10: 11: /** 12: * A bounded {@linkplain BlockingQueue blocking queue} backed by an 13: * array. This queue orders elements FIFO (first-in-first-out). The 14: * <em>head</em> of the queue is that element that has been on the 15: * queue the longest time. The <em>tail</em> of the queue is that 16: * element that has been on the queue the shortest time. New elements 17: * are inserted at the tail of the queue, and the queue retrieval 18: * operations obtain elements at the head of the queue. 19: * 20: * <p>This is a classic "bounded buffer", in which a 21: * fixed-sized array holds elements inserted by producers and 22: * extracted by consumers. Once created, the capacity cannot be 23: * increased. Attempts to <tt>put</tt> an element into a full queue 24: * will result in the operation blocking; attempts to <tt>take</tt> an 25: * element from an empty queue will similarly block. 26: * 27: * <p> This class supports an optional fairness policy for ordering 28: * waiting producer and consumer threads. By default, this ordering 29: * is not guaranteed. However, a queue constructed with fairness set 30: * to <tt>true</tt> grants threads access in FIFO order. Fairness 31: * generally decreases throughput but reduces variability and avoids 32: * starvation. 33: * 34: * <p>This class and its iterator implement all of the 35: * <em>optional</em> methods of the {@link Collection} and {@link 36: * Iterator} interfaces. 37: * 38: * <p>This class is a member of the 39: * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 40: * Java Collections Framework</a>. 41: * 42: * @since 1.5 43: * @author Doug Lea 44: * @param <E> the type of elements held in this collection 45: */ 46: public class ArrayBlockingQueue<E> extends AbstractQueue<E> 47: implements BlockingQueue<E>, java.io.Serializable { 48: 49: /** 50: * Serialization ID. This class relies on default serialization 51: * even for the items array, which is default-serialized, even if 52: * it is empty. Otherwise it could not be declared final, which is 53: * necessary here. 54: */ 55: private static final long serialVersionUID = -817911632652898426L; 56: 57: /** The queued items */ 58: private final E[] items; 59: /** items index for next take, poll or remove */ 60: private int takeIndex; 61: /** items index for next put, offer, or add. */ 62: private int putIndex; 63: /** Number of items in the queue */ 64: private int count; 65: 66: /* 67: * Concurrency control uses the classic two-condition algorithm 68: * found in any textbook. 69: */ 70: 71: /** Main lock guarding all access */ 72: private final ReentrantLock lock; 73: /** Condition for waiting takes */ 74: private final Condition notEmpty; 75: /** Condition for waiting puts */ 76: private final Condition notFull; 77: 78: // Internal helper methods 79: 80: /** 81: * Circularly increment i. 82: */ 83: final int inc(int i) { 84: return (++i == items.length)? 0 : i; 85: } 86: 87: /** 88: * Inserts element at current put position, advances, and signals. 89: * Call only when holding lock. 90: */ 91: private void insert(E x) { 92: items[putIndex] = x; 93: putIndex = inc(putIndex); 94: ++count; 95: notEmpty.signal(); 96: } 97: 98: /** 99: * Extracts element at current take position, advances, and signals. 100: * Call only when holding lock. 101: */ 102: private E extract() { 103: final E[] items = this.items; 104: E x = items[takeIndex]; 105: items[takeIndex] = null; 106: takeIndex = inc(takeIndex); 107: --count; 108: notFull.signal(); 109: return x; 110: } 111: 112: /** 113: * Utility for remove and iterator.remove: Delete item at position i. 114: * Call only when holding lock. 115: */ 116: void removeAt(int i) { 117: final E[] items = this.items; 118: // if removing front item, just advance 119: if (i == takeIndex) { 120: items[takeIndex] = null; 121: takeIndex = inc(takeIndex); 122: } else { 123: // slide over all others up through putIndex. 124: for (;;) { 125: int nexti = inc(i); 126: if (nexti != putIndex) { 127: items[i] = items[nexti]; 128: i = nexti; 129: } else { 130: items[i] = null; 131: putIndex = i; 132: break; 133: } 134: } 135: } 136: --count; 137: notFull.signal(); 138: } 139: 140: /** 141: * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) 142: * capacity and default access policy. 143: * 144: * @param capacity the capacity of this queue 145: * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 146: */ 147: public ArrayBlockingQueue(int capacity) { 148: this(capacity, false); 149: } 150: 151: /** 152: * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) 153: * capacity and the specified access policy. 154: * 155: * @param capacity the capacity of this queue 156: * @param fair if <tt>true</tt> then queue accesses for threads blocked 157: * on insertion or removal, are processed in FIFO order; 158: * if <tt>false</tt> the access order is unspecified. 159: * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 160: */ 161: public ArrayBlockingQueue(int capacity, boolean fair) { 162: if (capacity <= 0) 163: throw new IllegalArgumentException(); 164: this.items = (E[]) new Object[capacity]; 165: lock = new ReentrantLock(fair); 166: notEmpty = lock.newCondition(); 167: notFull = lock.newCondition(); 168: } 169: 170: /** 171: * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) 172: * capacity, the specified access policy and initially containing the 173: * elements of the given collection, 174: * added in traversal order of the collection's iterator. 175: * 176: * @param capacity the capacity of this queue 177: * @param fair if <tt>true</tt> then queue accesses for threads blocked 178: * on insertion or removal, are processed in FIFO order; 179: * if <tt>false</tt> the access order is unspecified. 180: * @param c the collection of elements to initially contain 181: * @throws IllegalArgumentException if <tt>capacity</tt> is less than 182: * <tt>c.size()</tt>, or less than 1. 183: * @throws NullPointerException if the specified collection or any 184: * of its elements are null 185: */ 186: public ArrayBlockingQueue(int capacity, boolean fair, 187: Collection<? extends E> c) { 188: this(capacity, fair); 189: if (capacity < c.size()) 190: throw new IllegalArgumentException(); 191: 192: for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 193: add(it.next()); 194: } 195: 196: /** 197: * Inserts the specified element at the tail of this queue if it is 198: * possible to do so immediately without exceeding the queue's capacity, 199: * returning <tt>true</tt> upon success and throwing an 200: * <tt>IllegalStateException</tt> if this queue is full. 201: * 202: * @param e the element to add 203: * @return <tt>true</tt> (as specified by {@link Collection#add}) 204: * @throws IllegalStateException if this queue is full 205: * @throws NullPointerException if the specified element is null 206: */ 207: public boolean add(E e) { 208: return super.add(e); 209: } 210: 211: /** 212: * Inserts the specified element at the tail of this queue if it is 213: * possible to do so immediately without exceeding the queue's capacity, 214: * returning <tt>true</tt> upon success and <tt>false</tt> if this queue 215: * is full. This method is generally preferable to method {@link #add}, 216: * which can fail to insert an element only by throwing an exception. 217: * 218: * @throws NullPointerException if the specified element is null 219: */ 220: public boolean offer(E e) { 221: if (e == null) throw new NullPointerException(); 222: final ReentrantLock lock = this.lock; 223: lock.lock(); 224: try { 225: if (count == items.length) 226: return false; 227: else { 228: insert(e); 229: return true; 230: } 231: } finally { 232: lock.unlock(); 233: } 234: } 235: 236: /** 237: * Inserts the specified element at the tail of this queue, waiting 238: * for space to become available if the queue is full. 239: * 240: * @throws InterruptedException {@inheritDoc} 241: * @throws NullPointerException {@inheritDoc} 242: */ 243: public void put(E e) throws InterruptedException { 244: if (e == null) throw new NullPointerException(); 245: final E[] items = this.items; 246: final ReentrantLock lock = this.lock; 247: lock.lockInterruptibly(); 248: try { 249: try { 250: while (count == items.length) 251: notFull.await(); 252: } catch (InterruptedException ie) { 253: notFull.signal(); // propagate to non-interrupted thread 254: throw ie; 255: } 256: insert(e); 257: } finally { 258: lock.unlock(); 259: } 260: } 261: 262: /** 263: * Inserts the specified element at the tail of this queue, waiting 264: * up to the specified wait time for space to become available if 265: * the queue is full. 266: * 267: * @throws InterruptedException {@inheritDoc} 268: * @throws NullPointerException {@inheritDoc} 269: */ 270: public boolean offer(E e, long timeout, TimeUnit unit) 271: throws InterruptedException { 272: 273: if (e == null) throw new NullPointerException(); 274: long nanos = unit.toNanos(timeout); 275: final ReentrantLock lock = this.lock; 276: lock.lockInterruptibly(); 277: try { 278: for (;;) { 279: if (count != items.length) { 280: insert(e); 281: return true; 282: } 283: if (nanos <= 0) 284: return false; 285: try { 286: nanos = notFull.awaitNanos(nanos); 287: } catch (InterruptedException ie) { 288: notFull.signal(); // propagate to non-interrupted thread 289: throw ie; 290: } 291: } 292: } finally { 293: lock.unlock(); 294: } 295: } 296: 297: public E poll() { 298: final ReentrantLock lock = this.lock; 299: lock.lock(); 300: try { 301: if (count == 0) 302: return null; 303: E x = extract(); 304: return x; 305: } finally { 306: lock.unlock(); 307: } 308: } 309: 310: public E take() throws InterruptedException { 311: final ReentrantLock lock = this.lock; 312: lock.lockInterruptibly(); 313: try { 314: try { 315: while (count == 0) 316: notEmpty.await(); 317: } catch (InterruptedException ie) { 318: notEmpty.signal(); // propagate to non-interrupted thread 319: throw ie; 320: } 321: E x = extract(); 322: return x; 323: } finally { 324: lock.unlock(); 325: } 326: } 327: 328: public E poll(long timeout, TimeUnit unit) throws InterruptedException { 329: long nanos = unit.toNanos(timeout); 330: final ReentrantLock lock = this.lock; 331: lock.lockInterruptibly(); 332: try { 333: for (;;) { 334: if (count != 0) { 335: E x = extract(); 336: return x; 337: } 338: if (nanos <= 0) 339: return null; 340: try { 341: nanos = notEmpty.awaitNanos(nanos); 342: } catch (InterruptedException ie) { 343: notEmpty.signal(); // propagate to non-interrupted thread 344: throw ie; 345: } 346: 347: } 348: } finally { 349: lock.unlock(); 350: } 351: } 352: 353: public E peek() { 354: final ReentrantLock lock = this.lock; 355: lock.lock(); 356: try { 357: return (count == 0) ? null : items[takeIndex]; 358: } finally { 359: lock.unlock(); 360: } 361: } 362: 363: // this doc comment is overridden to remove the reference to collections 364: // greater in size than Integer.MAX_VALUE 365: /** 366: * Returns the number of elements in this queue. 367: * 368: * @return the number of elements in this queue 369: */ 370: public int size() { 371: final ReentrantLock lock = this.lock; 372: lock.lock(); 373: try { 374: return count; 375: } finally { 376: lock.unlock(); 377: } 378: } 379: 380: // this doc comment is a modified copy of the inherited doc comment, 381: // without the reference to unlimited queues. 382: /** 383: * Returns the number of additional elements that this queue can ideally 384: * (in the absence of memory or resource constraints) accept without 385: * blocking. This is always equal to the initial capacity of this queue 386: * less the current <tt>size</tt> of this queue. 387: * 388: * <p>Note that you <em>cannot</em> always tell if an attempt to insert 389: * an element will succeed by inspecting <tt>remainingCapacity</tt> 390: * because it may be the case that another thread is about to 391: * insert or remove an element. 392: */ 393: public int remainingCapacity() { 394: final ReentrantLock lock = this.lock; 395: lock.lock(); 396: try { 397: return items.length - count; 398: } finally { 399: lock.unlock(); 400: } 401: } 402: 403: /** 404: * Removes a single instance of the specified element from this queue, 405: * if it is present. More formally, removes an element <tt>e</tt> such 406: * that <tt>o.equals(e)</tt>, if this queue contains one or more such 407: * elements. 408: * Returns <tt>true</tt> if this queue contained the specified element 409: * (or equivalently, if this queue changed as a result of the call). 410: * 411: * @param o element to be removed from this queue, if present 412: * @return <tt>true</tt> if this queue changed as a result of the call 413: */ 414: public boolean remove(Object o) { 415: if (o == null) return false; 416: final E[] items = this.items; 417: final ReentrantLock lock = this.lock; 418: lock.lock(); 419: try { 420: int i = takeIndex; 421: int k = 0; 422: for (;;) { 423: if (k++ >= count) 424: return false; 425: if (o.equals(items[i])) { 426: removeAt(i); 427: return true; 428: } 429: i = inc(i); 430: } 431: 432: } finally { 433: lock.unlock(); 434: } 435: } 436: 437: /** 438: * Returns <tt>true</tt> if this queue contains the specified element. 439: * More formally, returns <tt>true</tt> if and only if this queue contains 440: * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. 441: * 442: * @param o object to be checked for containment in this queue 443: * @return <tt>true</tt> if this queue contains the specified element 444: */ 445: public boolean contains(Object o) { 446: if (o == null) return false; 447: final E[] items = this.items; 448: final ReentrantLock lock = this.lock; 449: lock.lock(); 450: try { 451: int i = takeIndex; 452: int k = 0; 453: while (k++ < count) { 454: if (o.equals(items[i])) 455: return true; 456: i = inc(i); 457: } 458: return false; 459: } finally { 460: lock.unlock(); 461: } 462: } 463: 464: /** 465: * Returns an array containing all of the elements in this queue, in 466: * proper sequence. 467: * 468: * <p>The returned array will be "safe" in that no references to it are 469: * maintained by this queue. (In other words, this method must allocate 470: * a new array). The caller is thus free to modify the returned array. 471: * 472: * <p>This method acts as bridge between array-based and collection-based 473: * APIs. 474: * 475: * @return an array containing all of the elements in this queue 476: */ 477: public Object[] toArray() { 478: final E[] items = this.items; 479: final ReentrantLock lock = this.lock; 480: lock.lock(); 481: try { 482: Object[] a = new Object[count]; 483: int k = 0; 484: int i = takeIndex; 485: while (k < count) { 486: a[k++] = items[i]; 487: i = inc(i); 488: } 489: return a; 490: } finally { 491: lock.unlock(); 492: } 493: } 494: 495: /** 496: * Returns an array containing all of the elements in this queue, in 497: * proper sequence; the runtime type of the returned array is that of 498: * the specified array. If the queue fits in the specified array, it 499: * is returned therein. Otherwise, a new array is allocated with the 500: * runtime type of the specified array and the size of this queue. 501: * 502: * <p>If this queue fits in the specified array with room to spare 503: * (i.e., the array has more elements than this queue), the element in 504: * the array immediately following the end of the queue is set to 505: * <tt>null</tt>. 506: * 507: * <p>Like the {@link #toArray()} method, this method acts as bridge between 508: * array-based and collection-based APIs. Further, this method allows 509: * precise control over the runtime type of the output array, and may, 510: * under certain circumstances, be used to save allocation costs. 511: * 512: * <p>Suppose <tt>x</tt> is a queue known to contain only strings. 513: * The following code can be used to dump the queue into a newly 514: * allocated array of <tt>String</tt>: 515: * 516: * <pre> 517: * String[] y = x.toArray(new String[0]);</pre> 518: * 519: * Note that <tt>toArray(new Object[0])</tt> is identical in function to 520: * <tt>toArray()</tt>. 521: * 522: * @param a the array into which the elements of the queue are to 523: * be stored, if it is big enough; otherwise, a new array of the 524: * same runtime type is allocated for this purpose 525: * @return an array containing all of the elements in this queue 526: * @throws ArrayStoreException if the runtime type of the specified array 527: * is not a supertype of the runtime type of every element in 528: * this queue 529: * @throws NullPointerException if the specified array is null 530: */ 531: public <T> T[] toArray(T[] a) { 532: final E[] items = this.items; 533: final ReentrantLock lock = this.lock; 534: lock.lock(); 535: try { 536: if (a.length < count) 537: a = (T[])java.lang.reflect.Array.newInstance( 538: a.getClass().getComponentType(), 539: count 540: ); 541: 542: int k = 0; 543: int i = takeIndex; 544: while (k < count) { 545: a[k++] = (T)items[i]; 546: i = inc(i); 547: } 548: if (a.length > count) 549: a[count] = null; 550: return a; 551: } finally { 552: lock.unlock(); 553: } 554: } 555: 556: public String toString() { 557: final ReentrantLock lock = this.lock; 558: lock.lock(); 559: try { 560: return super.toString(); 561: } finally { 562: lock.unlock(); 563: } 564: } 565: 566: /** 567: * Atomically removes all of the elements from this queue. 568: * The queue will be empty after this call returns. 569: */ 570: public void clear() { 571: final E[] items = this.items; 572: final ReentrantLock lock = this.lock; 573: lock.lock(); 574: try { 575: int i = takeIndex; 576: int k = count; 577: while (k-- > 0) { 578: items[i] = null; 579: i = inc(i); 580: } 581: count = 0; 582: putIndex = 0; 583: takeIndex = 0; 584: notFull.signalAll(); 585: } finally { 586: lock.unlock(); 587: } 588: } 589: 590: /** 591: * @throws UnsupportedOperationException {@inheritDoc} 592: * @throws ClassCastException {@inheritDoc} 593: * @throws NullPointerException {@inheritDoc} 594: * @throws IllegalArgumentException {@inheritDoc} 595: */ 596: public int drainTo(Collection<? super E> c) { 597: if (c == null) 598: throw new NullPointerException(); 599: if (c == this) 600: throw new IllegalArgumentException(); 601: final E[] items = this.items; 602: final ReentrantLock lock = this.lock; 603: lock.lock(); 604: try { 605: int i = takeIndex; 606: int n = 0; 607: int max = count; 608: while (n < max) { 609: c.add(items[i]); 610: items[i] = null; 611: i = inc(i); 612: ++n; 613: } 614: if (n > 0) { 615: count = 0; 616: putIndex = 0; 617: takeIndex = 0; 618: notFull.signalAll(); 619: } 620: return n; 621: } finally { 622: lock.unlock(); 623: } 624: } 625: 626: /** 627: * @throws UnsupportedOperationException {@inheritDoc} 628: * @throws ClassCastException {@inheritDoc} 629: * @throws NullPointerException {@inheritDoc} 630: * @throws IllegalArgumentException {@inheritDoc} 631: */ 632: public int drainTo(Collection<? super E> c, int maxElements) { 633: if (c == null) 634: throw new NullPointerException(); 635: if (c == this) 636: throw new IllegalArgumentException(); 637: if (maxElements <= 0) 638: return 0; 639: final E[] items = this.items; 640: final ReentrantLock lock = this.lock; 641: lock.lock(); 642: try { 643: int i = takeIndex; 644: int n = 0; 645: int sz = count; 646: int max = (maxElements < count)? maxElements : count; 647: while (n < max) { 648: c.add(items[i]); 649: items[i] = null; 650: i = inc(i); 651: ++n; 652: } 653: if (n > 0) { 654: count -= n; 655: takeIndex = i; 656: notFull.signalAll(); 657: } 658: return n; 659: } finally { 660: lock.unlock(); 661: } 662: } 663: 664: 665: /** 666: * Returns an iterator over the elements in this queue in proper sequence. 667: * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that 668: * will never throw {@link ConcurrentModificationException}, 669: * and guarantees to traverse elements as they existed upon 670: * construction of the iterator, and may (but is not guaranteed to) 671: * reflect any modifications subsequent to construction. 672: * 673: * @return an iterator over the elements in this queue in proper sequence 674: */ 675: public Iterator<E> iterator() { 676: final ReentrantLock lock = this.lock; 677: lock.lock(); 678: try { 679: return new Itr(); 680: } finally { 681: lock.unlock(); 682: } 683: } 684: 685: /** 686: * Iterator for ArrayBlockingQueue 687: */ 688: private class Itr implements Iterator<E> { 689: /** 690: * Index of element to be returned by next, 691: * or a negative number if no such. 692: */ 693: private int nextIndex; 694: 695: /** 696: * nextItem holds on to item fields because once we claim 697: * that an element exists in hasNext(), we must return it in 698: * the following next() call even if it was in the process of 699: * being removed when hasNext() was called. 700: */ 701: private E nextItem; 702: 703: /** 704: * Index of element returned by most recent call to next. 705: * Reset to -1 if this element is deleted by a call to remove. 706: */ 707: private int lastRet; 708: 709: Itr() { 710: lastRet = -1; 711: if (count == 0) 712: nextIndex = -1; 713: else { 714: nextIndex = takeIndex; 715: nextItem = items[takeIndex]; 716: } 717: } 718: 719: public boolean hasNext() { 720: /* 721: * No sync. We can return true by mistake here 722: * only if this iterator passed across threads, 723: * which we don't support anyway. 724: */ 725: return nextIndex >= 0; 726: } 727: 728: /** 729: * Checks whether nextIndex is valid; if so setting nextItem. 730: * Stops iterator when either hits putIndex or sees null item. 731: */ 732: private void checkNext() { 733: if (nextIndex == putIndex) { 734: nextIndex = -1; 735: nextItem = null; 736: } else { 737: nextItem = items[nextIndex]; 738: if (nextItem == null) 739: nextIndex = -1; 740: } 741: } 742: 743: public E next() { 744: final ReentrantLock lock = ArrayBlockingQueue.this.lock; 745: lock.lock(); 746: try { 747: if (nextIndex < 0) 748: throw new NoSuchElementException(); 749: lastRet = nextIndex; 750: E x = nextItem; 751: nextIndex = inc(nextIndex); 752: checkNext(); 753: return x; 754: } finally { 755: lock.unlock(); 756: } 757: } 758: 759: public void remove() { 760: final ReentrantLock lock = ArrayBlockingQueue.this.lock; 761: lock.lock(); 762: try { 763: int i = lastRet; 764: if (i == -1) 765: throw new IllegalStateException(); 766: lastRet = -1; 767: 768: int ti = takeIndex; 769: removeAt(i); 770: // back up cursor (reset to front if was first element) 771: nextIndex = (i == ti) ? takeIndex : i; 772: checkNext(); 773: } finally { 774: lock.unlock(); 775: } 776: } 777: } 778: }
GNU Classpath (0.98) |