GNU Classpath (0.97.2) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: 8: package java.util.concurrent; 9: import java.util.concurrent.locks.*; 10: import java.util.*; 11: 12: /** 13: * An unbounded {@linkplain BlockingQueue blocking queue} of 14: * <tt>Delayed</tt> elements, in which an element can only be taken 15: * when its delay has expired. The <em>head</em> of the queue is that 16: * <tt>Delayed</tt> element whose delay expired furthest in the 17: * past. If no delay has expired there is no head and <tt>poll</tt> 18: * will return <tt>null</tt>. Expiration occurs when an element's 19: * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less 20: * than or equal to zero. Even though unexpired elements cannot be 21: * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise 22: * treated as normal elements. For example, the <tt>size</tt> method 23: * returns the count of both expired and unexpired elements. 24: * This queue does not permit null elements. 25: * 26: * <p>This class and its iterator implement all of the 27: * <em>optional</em> methods of the {@link Collection} and {@link 28: * Iterator} interfaces. 29: * 30: * <p>This class is a member of the 31: * <a href="{@docRoot}/../technotes/guides/collections/index.html"> 32: * Java Collections Framework</a>. 33: * 34: * @since 1.5 35: * @author Doug Lea 36: * @param <E> the type of elements held in this collection 37: */ 38: 39: public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 40: implements BlockingQueue<E> { 41: 42: private transient final ReentrantLock lock = new ReentrantLock(); 43: private transient final Condition available = lock.newCondition(); 44: private final PriorityQueue<E> q = new PriorityQueue<E>(); 45: 46: /** 47: * Creates a new <tt>DelayQueue</tt> that is initially empty. 48: */ 49: public DelayQueue() {} 50: 51: /** 52: * Creates a <tt>DelayQueue</tt> initially containing the elements of the 53: * given collection of {@link Delayed} instances. 54: * 55: * @param c the collection of elements to initially contain 56: * @throws NullPointerException if the specified collection or any 57: * of its elements are null 58: */ 59: public DelayQueue(Collection<? extends E> c) { 60: this.addAll(c); 61: } 62: 63: /** 64: * Inserts the specified element into this delay queue. 65: * 66: * @param e the element to add 67: * @return <tt>true</tt> (as specified by {@link Collection#add}) 68: * @throws NullPointerException if the specified element is null 69: */ 70: public boolean add(E e) { 71: return offer(e); 72: } 73: 74: /** 75: * Inserts the specified element into this delay queue. 76: * 77: * @param e the element to add 78: * @return <tt>true</tt> 79: * @throws NullPointerException if the specified element is null 80: */ 81: public boolean offer(E e) { 82: final ReentrantLock lock = this.lock; 83: lock.lock(); 84: try { 85: E first = q.peek(); 86: q.offer(e); 87: if (first == null || e.compareTo(first) < 0) 88: available.signalAll(); 89: return true; 90: } finally { 91: lock.unlock(); 92: } 93: } 94: 95: /** 96: * Inserts the specified element into this delay queue. As the queue is 97: * unbounded this method will never block. 98: * 99: * @param e the element to add 100: * @throws NullPointerException {@inheritDoc} 101: */ 102: public void put(E e) { 103: offer(e); 104: } 105: 106: /** 107: * Inserts the specified element into this delay queue. As the queue is 108: * unbounded this method will never block. 109: * 110: * @param e the element to add 111: * @param timeout This parameter is ignored as the method never blocks 112: * @param unit This parameter is ignored as the method never blocks 113: * @return <tt>true</tt> 114: * @throws NullPointerException {@inheritDoc} 115: */ 116: public boolean offer(E e, long timeout, TimeUnit unit) { 117: return offer(e); 118: } 119: 120: /** 121: * Retrieves and removes the head of this queue, or returns <tt>null</tt> 122: * if this queue has no elements with an expired delay. 123: * 124: * @return the head of this queue, or <tt>null</tt> if this 125: * queue has no elements with an expired delay 126: */ 127: public E poll() { 128: final ReentrantLock lock = this.lock; 129: lock.lock(); 130: try { 131: E first = q.peek(); 132: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 133: return null; 134: else { 135: E x = q.poll(); 136: assert x != null; 137: if (q.size() != 0) 138: available.signalAll(); 139: return x; 140: } 141: } finally { 142: lock.unlock(); 143: } 144: } 145: 146: /** 147: * Retrieves and removes the head of this queue, waiting if necessary 148: * until an element with an expired delay is available on this queue. 149: * 150: * @return the head of this queue 151: * @throws InterruptedException {@inheritDoc} 152: */ 153: public E take() throws InterruptedException { 154: final ReentrantLock lock = this.lock; 155: lock.lockInterruptibly(); 156: try { 157: for (;;) { 158: E first = q.peek(); 159: if (first == null) { 160: available.await(); 161: } else { 162: long delay = first.getDelay(TimeUnit.NANOSECONDS); 163: if (delay > 0) { 164: long tl = available.awaitNanos(delay); 165: } else { 166: E x = q.poll(); 167: assert x != null; 168: if (q.size() != 0) 169: available.signalAll(); // wake up other takers 170: return x; 171: 172: } 173: } 174: } 175: } finally { 176: lock.unlock(); 177: } 178: } 179: 180: /** 181: * Retrieves and removes the head of this queue, waiting if necessary 182: * until an element with an expired delay is available on this queue, 183: * or the specified wait time expires. 184: * 185: * @return the head of this queue, or <tt>null</tt> if the 186: * specified waiting time elapses before an element with 187: * an expired delay becomes available 188: * @throws InterruptedException {@inheritDoc} 189: */ 190: public E poll(long timeout, TimeUnit unit) throws InterruptedException { 191: long nanos = unit.toNanos(timeout); 192: final ReentrantLock lock = this.lock; 193: lock.lockInterruptibly(); 194: try { 195: for (;;) { 196: E first = q.peek(); 197: if (first == null) { 198: if (nanos <= 0) 199: return null; 200: else 201: nanos = available.awaitNanos(nanos); 202: } else { 203: long delay = first.getDelay(TimeUnit.NANOSECONDS); 204: if (delay > 0) { 205: if (nanos <= 0) 206: return null; 207: if (delay > nanos) 208: delay = nanos; 209: long timeLeft = available.awaitNanos(delay); 210: nanos -= delay - timeLeft; 211: } else { 212: E x = q.poll(); 213: assert x != null; 214: if (q.size() != 0) 215: available.signalAll(); 216: return x; 217: } 218: } 219: } 220: } finally { 221: lock.unlock(); 222: } 223: } 224: 225: /** 226: * Retrieves, but does not remove, the head of this queue, or 227: * returns <tt>null</tt> if this queue is empty. Unlike 228: * <tt>poll</tt>, if no expired elements are available in the queue, 229: * this method returns the element that will expire next, 230: * if one exists. 231: * 232: * @return the head of this queue, or <tt>null</tt> if this 233: * queue is empty. 234: */ 235: public E peek() { 236: final ReentrantLock lock = this.lock; 237: lock.lock(); 238: try { 239: return q.peek(); 240: } finally { 241: lock.unlock(); 242: } 243: } 244: 245: public int size() { 246: final ReentrantLock lock = this.lock; 247: lock.lock(); 248: try { 249: return q.size(); 250: } finally { 251: lock.unlock(); 252: } 253: } 254: 255: /** 256: * @throws UnsupportedOperationException {@inheritDoc} 257: * @throws ClassCastException {@inheritDoc} 258: * @throws NullPointerException {@inheritDoc} 259: * @throws IllegalArgumentException {@inheritDoc} 260: */ 261: public int drainTo(Collection<? super E> c) { 262: if (c == null) 263: throw new NullPointerException(); 264: if (c == this) 265: throw new IllegalArgumentException(); 266: final ReentrantLock lock = this.lock; 267: lock.lock(); 268: try { 269: int n = 0; 270: for (;;) { 271: E first = q.peek(); 272: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 273: break; 274: c.add(q.poll()); 275: ++n; 276: } 277: if (n > 0) 278: available.signalAll(); 279: return n; 280: } finally { 281: lock.unlock(); 282: } 283: } 284: 285: /** 286: * @throws UnsupportedOperationException {@inheritDoc} 287: * @throws ClassCastException {@inheritDoc} 288: * @throws NullPointerException {@inheritDoc} 289: * @throws IllegalArgumentException {@inheritDoc} 290: */ 291: public int drainTo(Collection<? super E> c, int maxElements) { 292: if (c == null) 293: throw new NullPointerException(); 294: if (c == this) 295: throw new IllegalArgumentException(); 296: if (maxElements <= 0) 297: return 0; 298: final ReentrantLock lock = this.lock; 299: lock.lock(); 300: try { 301: int n = 0; 302: while (n < maxElements) { 303: E first = q.peek(); 304: if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 305: break; 306: c.add(q.poll()); 307: ++n; 308: } 309: if (n > 0) 310: available.signalAll(); 311: return n; 312: } finally { 313: lock.unlock(); 314: } 315: } 316: 317: /** 318: * Atomically removes all of the elements from this delay queue. 319: * The queue will be empty after this call returns. 320: * Elements with an unexpired delay are not waited for; they are 321: * simply discarded from the queue. 322: */ 323: public void clear() { 324: final ReentrantLock lock = this.lock; 325: lock.lock(); 326: try { 327: q.clear(); 328: } finally { 329: lock.unlock(); 330: } 331: } 332: 333: /** 334: * Always returns <tt>Integer.MAX_VALUE</tt> because 335: * a <tt>DelayQueue</tt> is not capacity constrained. 336: * 337: * @return <tt>Integer.MAX_VALUE</tt> 338: */ 339: public int remainingCapacity() { 340: return Integer.MAX_VALUE; 341: } 342: 343: /** 344: * Returns an array containing all of the elements in this queue. 345: * The returned array elements are in no particular order. 346: * 347: * <p>The returned array will be "safe" in that no references to it are 348: * maintained by this queue. (In other words, this method must allocate 349: * a new array). The caller is thus free to modify the returned array. 350: * 351: * <p>This method acts as bridge between array-based and collection-based 352: * APIs. 353: * 354: * @return an array containing all of the elements in this queue 355: */ 356: public Object[] toArray() { 357: final ReentrantLock lock = this.lock; 358: lock.lock(); 359: try { 360: return q.toArray(); 361: } finally { 362: lock.unlock(); 363: } 364: } 365: 366: /** 367: * Returns an array containing all of the elements in this queue; the 368: * runtime type of the returned array is that of the specified array. 369: * The returned array elements are in no particular order. 370: * If the queue fits in the specified array, it is returned therein. 371: * Otherwise, a new array is allocated with the runtime type of the 372: * specified array and the size of this queue. 373: * 374: * <p>If this queue fits in the specified array with room to spare 375: * (i.e., the array has more elements than this queue), the element in 376: * the array immediately following the end of the queue is set to 377: * <tt>null</tt>. 378: * 379: * <p>Like the {@link #toArray()} method, this method acts as bridge between 380: * array-based and collection-based APIs. Further, this method allows 381: * precise control over the runtime type of the output array, and may, 382: * under certain circumstances, be used to save allocation costs. 383: * 384: * <p>The following code can be used to dump a delay queue into a newly 385: * allocated array of <tt>Delayed</tt>: 386: * 387: * <pre> 388: * Delayed[] a = q.toArray(new Delayed[0]);</pre> 389: * 390: * Note that <tt>toArray(new Object[0])</tt> is identical in function to 391: * <tt>toArray()</tt>. 392: * 393: * @param a the array into which the elements of the queue are to 394: * be stored, if it is big enough; otherwise, a new array of the 395: * same runtime type is allocated for this purpose 396: * @return an array containing all of the elements in this queue 397: * @throws ArrayStoreException if the runtime type of the specified array 398: * is not a supertype of the runtime type of every element in 399: * this queue 400: * @throws NullPointerException if the specified array is null 401: */ 402: public <T> T[] toArray(T[] a) { 403: final ReentrantLock lock = this.lock; 404: lock.lock(); 405: try { 406: return q.toArray(a); 407: } finally { 408: lock.unlock(); 409: } 410: } 411: 412: /** 413: * Removes a single instance of the specified element from this 414: * queue, if it is present, whether or not it has expired. 415: */ 416: public boolean remove(Object o) { 417: final ReentrantLock lock = this.lock; 418: lock.lock(); 419: try { 420: return q.remove(o); 421: } finally { 422: lock.unlock(); 423: } 424: } 425: 426: /** 427: * Returns an iterator over all the elements (both expired and 428: * unexpired) in this queue. The iterator does not return the 429: * elements in any particular order. The returned 430: * <tt>Iterator</tt> is a "weakly consistent" iterator that will 431: * never throw {@link ConcurrentModificationException}, and 432: * guarantees to traverse elements as they existed upon 433: * construction of the iterator, and may (but is not guaranteed 434: * to) reflect any modifications subsequent to construction. 435: * 436: * @return an iterator over the elements in this queue 437: */ 438: public Iterator<E> iterator() { 439: return new Itr(toArray()); 440: } 441: 442: /** 443: * Snapshot iterator that works off copy of underlying q array. 444: */ 445: private class Itr implements Iterator<E> { 446: final Object[] array; // Array of all elements 447: int cursor; // index of next element to return; 448: int lastRet; // index of last element, or -1 if no such 449: 450: Itr(Object[] array) { 451: lastRet = -1; 452: this.array = array; 453: } 454: 455: public boolean hasNext() { 456: return cursor < array.length; 457: } 458: 459: public E next() { 460: if (cursor >= array.length) 461: throw new NoSuchElementException(); 462: lastRet = cursor; 463: return (E)array[cursor++]; 464: } 465: 466: public void remove() { 467: if (lastRet < 0) 468: throw new IllegalStateException(); 469: Object x = array[lastRet]; 470: lastRet = -1; 471: // Traverse underlying queue to find == element, 472: // not just a .equals element. 473: lock.lock(); 474: try { 475: for (Iterator it = q.iterator(); it.hasNext(); ) { 476: if (it.next() == x) { 477: it.remove(); 478: return; 479: } 480: } 481: } finally { 482: lock.unlock(); 483: } 484: } 485: } 486: 487: }
GNU Classpath (0.97.2) |