GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: import java.util.concurrent.atomic.*; 9: import java.util.*; 10: 11: /** 12: * A {@link ThreadPoolExecutor} that can additionally schedule 13: * commands to run after a given delay, or to execute 14: * periodically. This class is preferable to {@link java.util.Timer} 15: * when multiple worker threads are needed, or when the additional 16: * flexibility or capabilities of {@link ThreadPoolExecutor} (which 17: * this class extends) are required. 18: * 19: * <p> Delayed tasks execute no sooner than they are enabled, but 20: * without any real-time guarantees about when, after they are 21: * enabled, they will commence. Tasks scheduled for exactly the same 22: * execution time are enabled in first-in-first-out (FIFO) order of 23: * submission. 24: * 25: * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 26: * of the inherited tuning methods are not useful for it. In 27: * particular, because it acts as a fixed-sized pool using 28: * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments 29: * to <tt>maximumPoolSize</tt> have no useful effect. 30: * 31: * <p><b>Extension notes:</b> This class overrides {@link 32: * AbstractExecutorService} <tt>submit</tt> methods to generate 33: * internal objects to control per-task delays and scheduling. To 34: * preserve functionality, any further overrides of these methods in 35: * subclasses must invoke superclass versions, which effectively 36: * disables additional task customization. However, this class 37: * provides alternative protected extension method 38: * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and 39: * <tt>Callable</tt>) that can be used to customize the concrete task 40: * types used to execute commands entered via <tt>execute</tt>, 41: * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>, 42: * and <tt>scheduleWithFixedDelay</tt>. By default, a 43: * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending 44: * {@link FutureTask}. However, this may be modified or replaced using 45: * subclasses of the form: 46: * 47: * <pre> 48: * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 49: * 50: * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 51: * 52: * protected <V> RunnableScheduledFuture<V> decorateTask( 53: * Runnable r, RunnableScheduledFuture<V> task) { 54: * return new CustomTask<V>(r, task); 55: * } 56: * 57: * protected <V> RunnableScheduledFuture<V> decorateTask( 58: * Callable<V> c, RunnableScheduledFuture<V> task) { 59: * return new CustomTask<V>(c, task); 60: * } 61: * // ... add constructors, etc. 62: * } 63: * </pre> 64: * @since 1.5 65: * @author Doug Lea 66: */ 67: public class ScheduledThreadPoolExecutor 68: extends ThreadPoolExecutor 69: implements ScheduledExecutorService { 70: 71: /** 72: * False if should cancel/suppress periodic tasks on shutdown. 73: */ 74: private volatile boolean continueExistingPeriodicTasksAfterShutdown; 75: 76: /** 77: * False if should cancel non-periodic tasks on shutdown. 78: */ 79: private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 80: 81: /** 82: * Sequence number to break scheduling ties, and in turn to 83: * guarantee FIFO order among tied entries. 84: */ 85: private static final AtomicLong sequencer = new AtomicLong(0); 86: 87: /** Base of nanosecond timings, to avoid wrapping */ 88: private static final long NANO_ORIGIN = System.nanoTime(); 89: 90: /** 91: * Returns nanosecond time offset by origin 92: */ 93: final long now() { 94: return System.nanoTime() - NANO_ORIGIN; 95: } 96: 97: private class ScheduledFutureTask<V> 98: extends FutureTask<V> implements RunnableScheduledFuture<V> { 99: 100: /** Sequence number to break ties FIFO */ 101: private final long sequenceNumber; 102: /** The time the task is enabled to execute in nanoTime units */ 103: private long time; 104: /** 105: * Period in nanoseconds for repeating tasks. A positive 106: * value indicates fixed-rate execution. A negative value 107: * indicates fixed-delay execution. A value of 0 indicates a 108: * non-repeating task. 109: */ 110: private final long period; 111: 112: /** 113: * Creates a one-shot action with given nanoTime-based trigger time. 114: */ 115: ScheduledFutureTask(Runnable r, V result, long ns) { 116: super(r, result); 117: this.time = ns; 118: this.period = 0; 119: this.sequenceNumber = sequencer.getAndIncrement(); 120: } 121: 122: /** 123: * Creates a periodic action with given nano time and period. 124: */ 125: ScheduledFutureTask(Runnable r, V result, long ns, long period) { 126: super(r, result); 127: this.time = ns; 128: this.period = period; 129: this.sequenceNumber = sequencer.getAndIncrement(); 130: } 131: 132: /** 133: * Creates a one-shot action with given nanoTime-based trigger. 134: */ 135: ScheduledFutureTask(Callable<V> callable, long ns) { 136: super(callable); 137: this.time = ns; 138: this.period = 0; 139: this.sequenceNumber = sequencer.getAndIncrement(); 140: } 141: 142: public long getDelay(TimeUnit unit) { 143: long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); 144: return d; 145: } 146: 147: public int compareTo(Delayed other) { 148: if (other == this) // compare zero ONLY if same object 149: return 0; 150: if (other instanceof ScheduledFutureTask) { 151: ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 152: long diff = time - x.time; 153: if (diff < 0) 154: return -1; 155: else if (diff > 0) 156: return 1; 157: else if (sequenceNumber < x.sequenceNumber) 158: return -1; 159: else 160: return 1; 161: } 162: long d = (getDelay(TimeUnit.NANOSECONDS) - 163: other.getDelay(TimeUnit.NANOSECONDS)); 164: return (d == 0)? 0 : ((d < 0)? -1 : 1); 165: } 166: 167: /** 168: * Returns true if this is a periodic (not a one-shot) action. 169: * 170: * @return true if periodic 171: */ 172: public boolean isPeriodic() { 173: return period != 0; 174: } 175: 176: /** 177: * Runs a periodic task. 178: */ 179: private void runPeriodic() { 180: boolean ok = ScheduledFutureTask.super.runAndReset(); 181: boolean down = isShutdown(); 182: // Reschedule if not cancelled and not shutdown or policy allows 183: if (ok && (!down || 184: (getContinueExistingPeriodicTasksAfterShutdownPolicy() && 185: !isTerminating()))) { 186: long p = period; 187: if (p > 0) 188: time += p; 189: else 190: time = now() - p; 191: // Classpath local: ecj from eclipse 3.1 does not 192: // compile this. 193: // ScheduledThreadPoolExecutor.super.getQueue().add(this); 194: ScheduledThreadPoolExecutor.super.getQueue().add((Runnable) this); 195: } 196: // This might have been the final executed delayed 197: // task. Wake up threads to check. 198: else if (down) 199: interruptIdleWorkers(); 200: } 201: 202: /** 203: * Overrides FutureTask version so as to reset/requeue if periodic. 204: */ 205: public void run() { 206: if (isPeriodic()) 207: runPeriodic(); 208: else 209: ScheduledFutureTask.super.run(); 210: } 211: } 212: 213: /** 214: * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. 215: */ 216: private void delayedExecute(Runnable command) { 217: if (isShutdown()) { 218: reject(command); 219: return; 220: } 221: // Prestart a thread if necessary. We cannot prestart it 222: // running the task because the task (probably) shouldn't be 223: // run yet, so thread will just idle until delay elapses. 224: if (getPoolSize() < getCorePoolSize()) 225: prestartCoreThread(); 226: 227: super.getQueue().add(command); 228: } 229: 230: /** 231: * Cancels and clears the queue of all tasks that should not be run 232: * due to shutdown policy. 233: */ 234: private void cancelUnwantedTasks() { 235: boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); 236: boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); 237: if (!keepDelayed && !keepPeriodic) 238: super.getQueue().clear(); 239: else if (keepDelayed || keepPeriodic) { 240: Object[] entries = super.getQueue().toArray(); 241: for (int i = 0; i < entries.length; ++i) { 242: Object e = entries[i]; 243: if (e instanceof RunnableScheduledFuture) { 244: RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; 245: if (t.isPeriodic()? !keepPeriodic : !keepDelayed) 246: t.cancel(false); 247: } 248: } 249: entries = null; 250: purge(); 251: } 252: } 253: 254: public boolean remove(Runnable task) { 255: if (!(task instanceof RunnableScheduledFuture)) 256: return false; 257: return getQueue().remove(task); 258: } 259: 260: /** 261: * Modifies or replaces the task used to execute a runnable. 262: * This method can be used to override the concrete 263: * class used for managing internal tasks. 264: * The default implementation simply returns the given task. 265: * 266: * @param runnable the submitted Runnable 267: * @param task the task created to execute the runnable 268: * @return a task that can execute the runnable 269: * @since 1.6 270: */ 271: protected <V> RunnableScheduledFuture<V> decorateTask( 272: Runnable runnable, RunnableScheduledFuture<V> task) { 273: return task; 274: } 275: 276: /** 277: * Modifies or replaces the task used to execute a callable. 278: * This method can be used to override the concrete 279: * class used for managing internal tasks. 280: * The default implementation simply returns the given task. 281: * 282: * @param callable the submitted Callable 283: * @param task the task created to execute the callable 284: * @return a task that can execute the callable 285: * @since 1.6 286: */ 287: protected <V> RunnableScheduledFuture<V> decorateTask( 288: Callable<V> callable, RunnableScheduledFuture<V> task) { 289: return task; 290: } 291: 292: /** 293: * Creates a new ScheduledThreadPoolExecutor with the given core 294: * pool size. 295: * 296: * @param corePoolSize the number of threads to keep in the pool, 297: * even if they are idle 298: * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> 299: */ 300: public ScheduledThreadPoolExecutor(int corePoolSize) { 301: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 302: new DelayedWorkQueue()); 303: } 304: 305: /** 306: * Creates a new ScheduledThreadPoolExecutor with the given 307: * initial parameters. 308: * 309: * @param corePoolSize the number of threads to keep in the pool, 310: * even if they are idle 311: * @param threadFactory the factory to use when the executor 312: * creates a new thread 313: * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> 314: * @throws NullPointerException if threadFactory is null 315: */ 316: public ScheduledThreadPoolExecutor(int corePoolSize, 317: ThreadFactory threadFactory) { 318: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 319: new DelayedWorkQueue(), threadFactory); 320: } 321: 322: /** 323: * Creates a new ScheduledThreadPoolExecutor with the given 324: * initial parameters. 325: * 326: * @param corePoolSize the number of threads to keep in the pool, 327: * even if they are idle 328: * @param handler the handler to use when execution is blocked 329: * because the thread bounds and queue capacities are reached 330: * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> 331: * @throws NullPointerException if handler is null 332: */ 333: public ScheduledThreadPoolExecutor(int corePoolSize, 334: RejectedExecutionHandler handler) { 335: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 336: new DelayedWorkQueue(), handler); 337: } 338: 339: /** 340: * Creates a new ScheduledThreadPoolExecutor with the given 341: * initial parameters. 342: * 343: * @param corePoolSize the number of threads to keep in the pool, 344: * even if they are idle 345: * @param threadFactory the factory to use when the executor 346: * creates a new thread 347: * @param handler the handler to use when execution is blocked 348: * because the thread bounds and queue capacities are reached. 349: * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> 350: * @throws NullPointerException if threadFactory or handler is null 351: */ 352: public ScheduledThreadPoolExecutor(int corePoolSize, 353: ThreadFactory threadFactory, 354: RejectedExecutionHandler handler) { 355: super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 356: new DelayedWorkQueue(), threadFactory, handler); 357: } 358: 359: public ScheduledFuture<?> schedule(Runnable command, 360: long delay, 361: TimeUnit unit) { 362: if (command == null || unit == null) 363: throw new NullPointerException(); 364: long triggerTime = now() + unit.toNanos(delay); 365: RunnableScheduledFuture<?> t = decorateTask(command, 366: new ScheduledFutureTask<Boolean>(command, null, triggerTime)); 367: delayedExecute(t); 368: return t; 369: } 370: 371: public <V> ScheduledFuture<V> schedule(Callable<V> callable, 372: long delay, 373: TimeUnit unit) { 374: if (callable == null || unit == null) 375: throw new NullPointerException(); 376: if (delay < 0) delay = 0; 377: long triggerTime = now() + unit.toNanos(delay); 378: RunnableScheduledFuture<V> t = decorateTask(callable, 379: new ScheduledFutureTask<V>(callable, triggerTime)); 380: delayedExecute(t); 381: return t; 382: } 383: 384: public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 385: long initialDelay, 386: long period, 387: TimeUnit unit) { 388: if (command == null || unit == null) 389: throw new NullPointerException(); 390: if (period <= 0) 391: throw new IllegalArgumentException(); 392: if (initialDelay < 0) initialDelay = 0; 393: long triggerTime = now() + unit.toNanos(initialDelay); 394: RunnableScheduledFuture<?> t = decorateTask(command, 395: new ScheduledFutureTask<Object>(command, 396: null, 397: triggerTime, 398: unit.toNanos(period))); 399: delayedExecute(t); 400: return t; 401: } 402: 403: public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 404: long initialDelay, 405: long delay, 406: TimeUnit unit) { 407: if (command == null || unit == null) 408: throw new NullPointerException(); 409: if (delay <= 0) 410: throw new IllegalArgumentException(); 411: if (initialDelay < 0) initialDelay = 0; 412: long triggerTime = now() + unit.toNanos(initialDelay); 413: RunnableScheduledFuture<?> t = decorateTask(command, 414: new ScheduledFutureTask<Boolean>(command, 415: null, 416: triggerTime, 417: unit.toNanos(-delay))); 418: delayedExecute(t); 419: return t; 420: } 421: 422: 423: /** 424: * Executes command with zero required delay. This has effect 425: * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note 426: * that inspections of the queue and of the list returned by 427: * <tt>shutdownNow</tt> will access the zero-delayed 428: * {@link ScheduledFuture}, not the <tt>command</tt> itself. 429: * 430: * @param command the task to execute 431: * @throws RejectedExecutionException at discretion of 432: * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted 433: * for execution because the executor has been shut down. 434: * @throws NullPointerException if command is null 435: */ 436: public void execute(Runnable command) { 437: if (command == null) 438: throw new NullPointerException(); 439: schedule(command, 0, TimeUnit.NANOSECONDS); 440: } 441: 442: // Override AbstractExecutorService methods 443: 444: public Future<?> submit(Runnable task) { 445: return schedule(task, 0, TimeUnit.NANOSECONDS); 446: } 447: 448: public <T> Future<T> submit(Runnable task, T result) { 449: return schedule(Executors.callable(task, result), 450: 0, TimeUnit.NANOSECONDS); 451: } 452: 453: public <T> Future<T> submit(Callable<T> task) { 454: return schedule(task, 0, TimeUnit.NANOSECONDS); 455: } 456: 457: /** 458: * Sets the policy on whether to continue executing existing periodic 459: * tasks even when this executor has been <tt>shutdown</tt>. In 460: * this case, these tasks will only terminate upon 461: * <tt>shutdownNow</tt>, or after setting the policy to 462: * <tt>false</tt> when already shutdown. This value is by default 463: * false. 464: * 465: * @param value if true, continue after shutdown, else don't. 466: * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 467: */ 468: public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 469: continueExistingPeriodicTasksAfterShutdown = value; 470: if (!value && isShutdown()) 471: cancelUnwantedTasks(); 472: } 473: 474: /** 475: * Gets the policy on whether to continue executing existing 476: * periodic tasks even when this executor has been 477: * <tt>shutdown</tt>. In this case, these tasks will only 478: * terminate upon <tt>shutdownNow</tt> or after setting the policy 479: * to <tt>false</tt> when already shutdown. This value is by 480: * default false. 481: * 482: * @return true if will continue after shutdown 483: * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 484: */ 485: public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 486: return continueExistingPeriodicTasksAfterShutdown; 487: } 488: 489: /** 490: * Sets the policy on whether to execute existing delayed 491: * tasks even when this executor has been <tt>shutdown</tt>. In 492: * this case, these tasks will only terminate upon 493: * <tt>shutdownNow</tt>, or after setting the policy to 494: * <tt>false</tt> when already shutdown. This value is by default 495: * true. 496: * 497: * @param value if true, execute after shutdown, else don't. 498: * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 499: */ 500: public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 501: executeExistingDelayedTasksAfterShutdown = value; 502: if (!value && isShutdown()) 503: cancelUnwantedTasks(); 504: } 505: 506: /** 507: * Gets the policy on whether to execute existing delayed 508: * tasks even when this executor has been <tt>shutdown</tt>. In 509: * this case, these tasks will only terminate upon 510: * <tt>shutdownNow</tt>, or after setting the policy to 511: * <tt>false</tt> when already shutdown. This value is by default 512: * true. 513: * 514: * @return true if will execute after shutdown 515: * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 516: */ 517: public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 518: return executeExistingDelayedTasksAfterShutdown; 519: } 520: 521: 522: /** 523: * Initiates an orderly shutdown in which previously submitted 524: * tasks are executed, but no new tasks will be accepted. If the 525: * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has 526: * been set <tt>false</tt>, existing delayed tasks whose delays 527: * have not yet elapsed are cancelled. And unless the 528: * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has 529: * been set <tt>true</tt>, future executions of existing periodic 530: * tasks will be cancelled. 531: */ 532: public void shutdown() { 533: cancelUnwantedTasks(); 534: super.shutdown(); 535: } 536: 537: /** 538: * Attempts to stop all actively executing tasks, halts the 539: * processing of waiting tasks, and returns a list of the tasks 540: * that were awaiting execution. 541: * 542: * <p>There are no guarantees beyond best-effort attempts to stop 543: * processing actively executing tasks. This implementation 544: * cancels tasks via {@link Thread#interrupt}, so any task that 545: * fails to respond to interrupts may never terminate. 546: * 547: * @return list of tasks that never commenced execution. Each 548: * element of this list is a {@link ScheduledFuture}, 549: * including those tasks submitted using <tt>execute</tt>, which 550: * are for scheduling purposes used as the basis of a zero-delay 551: * <tt>ScheduledFuture</tt>. 552: * @throws SecurityException {@inheritDoc} 553: */ 554: public List<Runnable> shutdownNow() { 555: return super.shutdownNow(); 556: } 557: 558: /** 559: * Returns the task queue used by this executor. Each element of 560: * this queue is a {@link ScheduledFuture}, including those 561: * tasks submitted using <tt>execute</tt> which are for scheduling 562: * purposes used as the basis of a zero-delay 563: * <tt>ScheduledFuture</tt>. Iteration over this queue is 564: * <em>not</em> guaranteed to traverse tasks in the order in 565: * which they will execute. 566: * 567: * @return the task queue 568: */ 569: public BlockingQueue<Runnable> getQueue() { 570: return super.getQueue(); 571: } 572: 573: /** 574: * An annoying wrapper class to convince javac to use a 575: * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable> 576: */ 577: private static class DelayedWorkQueue 578: extends AbstractCollection<Runnable> 579: implements BlockingQueue<Runnable> { 580: 581: private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>(); 582: public Runnable poll() { return dq.poll(); } 583: public Runnable peek() { return dq.peek(); } 584: public Runnable take() throws InterruptedException { return dq.take(); } 585: public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 586: return dq.poll(timeout, unit); 587: } 588: 589: public boolean add(Runnable x) { 590: return dq.add((RunnableScheduledFuture)x); 591: } 592: public boolean offer(Runnable x) { 593: return dq.offer((RunnableScheduledFuture)x); 594: } 595: public void put(Runnable x) { 596: dq.put((RunnableScheduledFuture)x); 597: } 598: public boolean offer(Runnable x, long timeout, TimeUnit unit) { 599: return dq.offer((RunnableScheduledFuture)x, timeout, unit); 600: } 601: 602: public Runnable remove() { return dq.remove(); } 603: public Runnable element() { return dq.element(); } 604: public void clear() { dq.clear(); } 605: public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); } 606: public int drainTo(Collection<? super Runnable> c, int maxElements) { 607: return dq.drainTo(c, maxElements); 608: } 609: 610: public int remainingCapacity() { return dq.remainingCapacity(); } 611: public boolean remove(Object x) { return dq.remove(x); } 612: public boolean contains(Object x) { return dq.contains(x); } 613: public int size() { return dq.size(); } 614: public boolean isEmpty() { return dq.isEmpty(); } 615: public Object[] toArray() { return dq.toArray(); } 616: public <T> T[] toArray(T[] array) { return dq.toArray(array); } 617: public Iterator<Runnable> iterator() { 618: return new Iterator<Runnable>() { 619: private Iterator<RunnableScheduledFuture> it = dq.iterator(); 620: public boolean hasNext() { return it.hasNext(); } 621: public Runnable next() { return it.next(); } 622: public void remove() { it.remove(); } 623: }; 624: } 625: } 626: }
GNU Classpath (0.98) |