GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: 9: import java.util.concurrent.locks.*; 10: import java.util.*; 11: 12: /** 13: * An unbounded {@linkplain BlockingQueue blocking queue} that uses 14: * the same ordering rules as class {@link PriorityQueue} and supplies 15: * blocking retrieval operations. While this queue is logically 16: * unbounded, attempted additions may fail due to resource exhaustion 17: * (causing <tt>OutOfMemoryError</tt>). This class does not permit 18: * <tt>null</tt> elements. A priority queue relying on {@linkplain 19: * Comparable natural ordering} also does not permit insertion of 20: * non-comparable objects (doing so results in 21: * <tt>ClassCastException</tt>). 22: * 23: * <p>This class and its iterator implement all of the 24: * <em>optional</em> methods of the {@link Collection} and {@link 25: * Iterator} interfaces. The Iterator provided in method {@link 26: * #iterator()} is <em>not</em> guaranteed to traverse the elements of 27: * the PriorityBlockingQueue in any particular order. If you need 28: * ordered traversal, consider using 29: * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt> 30: * can be used to <em>remove</em> some or all elements in priority 31: * order and place them in another collection. 32: * 33: * <p>Operations on this class make no guarantees about the ordering 34: * of elements with equal priority. If you need to enforce an 35: * ordering, you can define custom classes or comparators that use a 36: * secondary key to break ties in primary priority values. For 37: * example, here is a class that applies first-in-first-out 38: * tie-breaking to comparable elements. To use it, you would insert a 39: * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object. 40: * 41: * <pre> 42: * class FIFOEntry<E extends Comparable<? super E>> 43: * implements Comparable<FIFOEntry<E>> { 44: * final static AtomicLong seq = new AtomicLong(); 45: * final long seqNum; 46: * final E entry; 47: * public FIFOEntry(E entry) { 48: * seqNum = seq.getAndIncrement(); 49: * this.entry = entry; 50: * } 51: * public E getEntry() { return entry; } 52: * public int compareTo(FIFOEntry<E> other) { 53: * int res = entry.compareTo(other.entry); 54: * if (res == 0 && other.entry != this.entry) 55: * res = (seqNum < other.seqNum ? -1 : 1); 56: * return res; 57: * } 58: * }</pre> 59: * 60: * <p>This class is a member of the 61: * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 62: * Java Collections Framework</a>. 63: * 64: * @since 1.5 65: * @author Doug Lea 66: * @param <E> the type of elements held in this collection 67: */ 68: public class PriorityBlockingQueue<E> extends AbstractQueue<E> 69: implements BlockingQueue<E>, java.io.Serializable { 70: private static final long serialVersionUID = 5595510919245408276L; 71: 72: private final PriorityQueue<E> q; 73: private final ReentrantLock lock = new ReentrantLock(true); 74: private final Condition notEmpty = lock.newCondition(); 75: 76: /** 77: * Creates a <tt>PriorityBlockingQueue</tt> with the default 78: * initial capacity (11) that orders its elements according to 79: * their {@linkplain Comparable natural ordering}. 80: */ 81: public PriorityBlockingQueue() { 82: q = new PriorityQueue<E>(); 83: } 84: 85: /** 86: * Creates a <tt>PriorityBlockingQueue</tt> with the specified 87: * initial capacity that orders its elements according to their 88: * {@linkplain Comparable natural ordering}. 89: * 90: * @param initialCapacity the initial capacity for this priority queue 91: * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 92: * than 1 93: */ 94: public PriorityBlockingQueue(int initialCapacity) { 95: q = new PriorityQueue<E>(initialCapacity, null); 96: } 97: 98: /** 99: * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial 100: * capacity that orders its elements according to the specified 101: * comparator. 102: * 103: * @param initialCapacity the initial capacity for this priority queue 104: * @param comparator the comparator that will be used to order this 105: * priority queue. If {@code null}, the {@linkplain Comparable 106: * natural ordering} of the elements will be used. 107: * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less 108: * than 1 109: */ 110: public PriorityBlockingQueue(int initialCapacity, 111: Comparator<? super E> comparator) { 112: q = new PriorityQueue<E>(initialCapacity, comparator); 113: } 114: 115: /** 116: * Creates a <tt>PriorityBlockingQueue</tt> containing the elements 117: * in the specified collection. If the specified collection is a 118: * {@link SortedSet} or a {@link PriorityQueue}, this 119: * priority queue will be ordered according to the same ordering. 120: * Otherwise, this priority queue will be ordered according to the 121: * {@linkplain Comparable natural ordering} of its elements. 122: * 123: * @param c the collection whose elements are to be placed 124: * into this priority queue 125: * @throws ClassCastException if elements of the specified collection 126: * cannot be compared to one another according to the priority 127: * queue's ordering 128: * @throws NullPointerException if the specified collection or any 129: * of its elements are null 130: */ 131: public PriorityBlockingQueue(Collection<? extends E> c) { 132: q = new PriorityQueue<E>(c); 133: } 134: 135: /** 136: * Inserts the specified element into this priority queue. 137: * 138: * @param e the element to add 139: * @return <tt>true</tt> (as specified by {@link Collection#add}) 140: * @throws ClassCastException if the specified element cannot be compared 141: * with elements currently in the priority queue according to the 142: * priority queue's ordering 143: * @throws NullPointerException if the specified element is null 144: */ 145: public boolean add(E e) { 146: return offer(e); 147: } 148: 149: /** 150: * Inserts the specified element into this priority queue. 151: * 152: * @param e the element to add 153: * @return <tt>true</tt> (as specified by {@link Queue#offer}) 154: * @throws ClassCastException if the specified element cannot be compared 155: * with elements currently in the priority queue according to the 156: * priority queue's ordering 157: * @throws NullPointerException if the specified element is null 158: */ 159: public boolean offer(E e) { 160: final ReentrantLock lock = this.lock; 161: lock.lock(); 162: try { 163: boolean ok = q.offer(e); 164: assert ok; 165: notEmpty.signal(); 166: return true; 167: } finally { 168: lock.unlock(); 169: } 170: } 171: 172: /** 173: * Inserts the specified element into this priority queue. As the queue is 174: * unbounded this method will never block. 175: * 176: * @param e the element to add 177: * @throws ClassCastException if the specified element cannot be compared 178: * with elements currently in the priority queue according to the 179: * priority queue's ordering 180: * @throws NullPointerException if the specified element is null 181: */ 182: public void put(E e) { 183: offer(e); // never need to block 184: } 185: 186: /** 187: * Inserts the specified element into this priority queue. As the queue is 188: * unbounded this method will never block. 189: * 190: * @param e the element to add 191: * @param timeout This parameter is ignored as the method never blocks 192: * @param unit This parameter is ignored as the method never blocks 193: * @return <tt>true</tt> 194: * @throws ClassCastException if the specified element cannot be compared 195: * with elements currently in the priority queue according to the 196: * priority queue's ordering 197: * @throws NullPointerException if the specified element is null 198: */ 199: public boolean offer(E e, long timeout, TimeUnit unit) { 200: return offer(e); // never need to block 201: } 202: 203: public E poll() { 204: final ReentrantLock lock = this.lock; 205: lock.lock(); 206: try { 207: return q.poll(); 208: } finally { 209: lock.unlock(); 210: } 211: } 212: 213: public E take() throws InterruptedException { 214: final ReentrantLock lock = this.lock; 215: lock.lockInterruptibly(); 216: try { 217: try { 218: while (q.size() == 0) 219: notEmpty.await(); 220: } catch (InterruptedException ie) { 221: notEmpty.signal(); // propagate to non-interrupted thread 222: throw ie; 223: } 224: E x = q.poll(); 225: assert x != null; 226: return x; 227: } finally { 228: lock.unlock(); 229: } 230: } 231: 232: public E poll(long timeout, TimeUnit unit) throws InterruptedException { 233: long nanos = unit.toNanos(timeout); 234: final ReentrantLock lock = this.lock; 235: lock.lockInterruptibly(); 236: try { 237: for (;;) { 238: E x = q.poll(); 239: if (x != null) 240: return x; 241: if (nanos <= 0) 242: return null; 243: try { 244: nanos = notEmpty.awaitNanos(nanos); 245: } catch (InterruptedException ie) { 246: notEmpty.signal(); // propagate to non-interrupted thread 247: throw ie; 248: } 249: } 250: } finally { 251: lock.unlock(); 252: } 253: } 254: 255: public E peek() { 256: final ReentrantLock lock = this.lock; 257: lock.lock(); 258: try { 259: return q.peek(); 260: } finally { 261: lock.unlock(); 262: } 263: } 264: 265: /** 266: * Returns the comparator used to order the elements in this queue, 267: * or <tt>null</tt> if this queue uses the {@linkplain Comparable 268: * natural ordering} of its elements. 269: * 270: * @return the comparator used to order the elements in this queue, 271: * or <tt>null</tt> if this queue uses the natural 272: * ordering of its elements 273: */ 274: public Comparator<? super E> comparator() { 275: return q.comparator(); 276: } 277: 278: public int size() { 279: final ReentrantLock lock = this.lock; 280: lock.lock(); 281: try { 282: return q.size(); 283: } finally { 284: lock.unlock(); 285: } 286: } 287: 288: /** 289: * Always returns <tt>Integer.MAX_VALUE</tt> because 290: * a <tt>PriorityBlockingQueue</tt> is not capacity constrained. 291: * @return <tt>Integer.MAX_VALUE</tt> 292: */ 293: public int remainingCapacity() { 294: return Integer.MAX_VALUE; 295: } 296: 297: /** 298: * Removes a single instance of the specified element from this queue, 299: * if it is present. More formally, removes an element {@code e} such 300: * that {@code o.equals(e)}, if this queue contains one or more such 301: * elements. Returns {@code true} if and only if this queue contained 302: * the specified element (or equivalently, if this queue changed as a 303: * result of the call). 304: * 305: * @param o element to be removed from this queue, if present 306: * @return <tt>true</tt> if this queue changed as a result of the call 307: */ 308: public boolean remove(Object o) { 309: final ReentrantLock lock = this.lock; 310: lock.lock(); 311: try { 312: return q.remove(o); 313: } finally { 314: lock.unlock(); 315: } 316: } 317: 318: /** 319: * Returns {@code true} if this queue contains the specified element. 320: * More formally, returns {@code true} if and only if this queue contains 321: * at least one element {@code e} such that {@code o.equals(e)}. 322: * 323: * @param o object to be checked for containment in this queue 324: * @return <tt>true</tt> if this queue contains the specified element 325: */ 326: public boolean contains(Object o) { 327: final ReentrantLock lock = this.lock; 328: lock.lock(); 329: try { 330: return q.contains(o); 331: } finally { 332: lock.unlock(); 333: } 334: } 335: 336: /** 337: * Returns an array containing all of the elements in this queue. 338: * The returned array elements are in no particular order. 339: * 340: * <p>The returned array will be "safe" in that no references to it are 341: * maintained by this queue. (In other words, this method must allocate 342: * a new array). The caller is thus free to modify the returned array. 343: * 344: * <p>This method acts as bridge between array-based and collection-based 345: * APIs. 346: * 347: * @return an array containing all of the elements in this queue 348: */ 349: public Object[] toArray() { 350: final ReentrantLock lock = this.lock; 351: lock.lock(); 352: try { 353: return q.toArray(); 354: } finally { 355: lock.unlock(); 356: } 357: } 358: 359: 360: public String toString() { 361: final ReentrantLock lock = this.lock; 362: lock.lock(); 363: try { 364: return q.toString(); 365: } finally { 366: lock.unlock(); 367: } 368: } 369: 370: /** 371: * @throws UnsupportedOperationException {@inheritDoc} 372: * @throws ClassCastException {@inheritDoc} 373: * @throws NullPointerException {@inheritDoc} 374: * @throws IllegalArgumentException {@inheritDoc} 375: */ 376: public int drainTo(Collection<? super E> c) { 377: if (c == null) 378: throw new NullPointerException(); 379: if (c == this) 380: throw new IllegalArgumentException(); 381: final ReentrantLock lock = this.lock; 382: lock.lock(); 383: try { 384: int n = 0; 385: E e; 386: while ( (e = q.poll()) != null) { 387: c.add(e); 388: ++n; 389: } 390: return n; 391: } finally { 392: lock.unlock(); 393: } 394: } 395: 396: /** 397: * @throws UnsupportedOperationException {@inheritDoc} 398: * @throws ClassCastException {@inheritDoc} 399: * @throws NullPointerException {@inheritDoc} 400: * @throws IllegalArgumentException {@inheritDoc} 401: */ 402: public int drainTo(Collection<? super E> c, int maxElements) { 403: if (c == null) 404: throw new NullPointerException(); 405: if (c == this) 406: throw new IllegalArgumentException(); 407: if (maxElements <= 0) 408: return 0; 409: final ReentrantLock lock = this.lock; 410: lock.lock(); 411: try { 412: int n = 0; 413: E e; 414: while (n < maxElements && (e = q.poll()) != null) { 415: c.add(e); 416: ++n; 417: } 418: return n; 419: } finally { 420: lock.unlock(); 421: } 422: } 423: 424: /** 425: * Atomically removes all of the elements from this queue. 426: * The queue will be empty after this call returns. 427: */ 428: public void clear() { 429: final ReentrantLock lock = this.lock; 430: lock.lock(); 431: try { 432: q.clear(); 433: } finally { 434: lock.unlock(); 435: } 436: } 437: 438: /** 439: * Returns an array containing all of the elements in this queue; the 440: * runtime type of the returned array is that of the specified array. 441: * The returned array elements are in no particular order. 442: * If the queue fits in the specified array, it is returned therein. 443: * Otherwise, a new array is allocated with the runtime type of the 444: * specified array and the size of this queue. 445: * 446: * <p>If this queue fits in the specified array with room to spare 447: * (i.e., the array has more elements than this queue), the element in 448: * the array immediately following the end of the queue is set to 449: * <tt>null</tt>. 450: * 451: * <p>Like the {@link #toArray()} method, this method acts as bridge between 452: * array-based and collection-based APIs. Further, this method allows 453: * precise control over the runtime type of the output array, and may, 454: * under certain circumstances, be used to save allocation costs. 455: * 456: * <p>Suppose <tt>x</tt> is a queue known to contain only strings. 457: * The following code can be used to dump the queue into a newly 458: * allocated array of <tt>String</tt>: 459: * 460: * <pre> 461: * String[] y = x.toArray(new String[0]);</pre> 462: * 463: * Note that <tt>toArray(new Object[0])</tt> is identical in function to 464: * <tt>toArray()</tt>. 465: * 466: * @param a the array into which the elements of the queue are to 467: * be stored, if it is big enough; otherwise, a new array of the 468: * same runtime type is allocated for this purpose 469: * @return an array containing all of the elements in this queue 470: * @throws ArrayStoreException if the runtime type of the specified array 471: * is not a supertype of the runtime type of every element in 472: * this queue 473: * @throws NullPointerException if the specified array is null 474: */ 475: public <T> T[] toArray(T[] a) { 476: final ReentrantLock lock = this.lock; 477: lock.lock(); 478: try { 479: return q.toArray(a); 480: } finally { 481: lock.unlock(); 482: } 483: } 484: 485: /** 486: * Returns an iterator over the elements in this queue. The 487: * iterator does not return the elements in any particular order. 488: * The returned <tt>Iterator</tt> is a "weakly consistent" 489: * iterator that will never throw {@link 490: * ConcurrentModificationException}, and guarantees to traverse 491: * elements as they existed upon construction of the iterator, and 492: * may (but is not guaranteed to) reflect any modifications 493: * subsequent to construction. 494: * 495: * @return an iterator over the elements in this queue 496: */ 497: public Iterator<E> iterator() { 498: return new Itr(toArray()); 499: } 500: 501: /** 502: * Snapshot iterator that works off copy of underlying q array. 503: */ 504: private class Itr implements Iterator<E> { 505: final Object[] array; // Array of all elements 506: int cursor; // index of next element to return; 507: int lastRet; // index of last element, or -1 if no such 508: 509: Itr(Object[] array) { 510: lastRet = -1; 511: this.array = array; 512: } 513: 514: public boolean hasNext() { 515: return cursor < array.length; 516: } 517: 518: public E next() { 519: if (cursor >= array.length) 520: throw new NoSuchElementException(); 521: lastRet = cursor; 522: return (E)array[cursor++]; 523: } 524: 525: public void remove() { 526: if (lastRet < 0) 527: throw new IllegalStateException(); 528: Object x = array[lastRet]; 529: lastRet = -1; 530: // Traverse underlying queue to find == element, 531: // not just a .equals element. 532: lock.lock(); 533: try { 534: for (Iterator it = q.iterator(); it.hasNext(); ) { 535: if (it.next() == x) { 536: it.remove(); 537: return; 538: } 539: } 540: } finally { 541: lock.unlock(); 542: } 543: } 544: } 545: 546: /** 547: * Saves the state to a stream (that is, serializes it). This 548: * merely wraps default serialization within lock. The 549: * serialization strategy for items is left to underlying 550: * Queue. Note that locking is not needed on deserialization, so 551: * readObject is not defined, just relying on default. 552: */ 553: private void writeObject(java.io.ObjectOutputStream s) 554: throws java.io.IOException { 555: lock.lock(); 556: try { 557: s.defaultWriteObject(); 558: } finally { 559: lock.unlock(); 560: } 561: } 562: 563: }
GNU Classpath (0.98) |