GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: import java.util.concurrent.locks.*; 9: import java.util.*; 10: 11: /** 12: * An {@link ExecutorService} that executes each submitted task using 13: * one of possibly several pooled threads, normally configured 14: * using {@link Executors} factory methods. 15: * 16: * <p>Thread pools address two different problems: they usually 17: * provide improved performance when executing large numbers of 18: * asynchronous tasks, due to reduced per-task invocation overhead, 19: * and they provide a means of bounding and managing the resources, 20: * including threads, consumed when executing a collection of tasks. 21: * Each <tt>ThreadPoolExecutor</tt> also maintains some basic 22: * statistics, such as the number of completed tasks. 23: * 24: * <p>To be useful across a wide range of contexts, this class 25: * provides many adjustable parameters and extensibility 26: * hooks. However, programmers are urged to use the more convenient 27: * {@link Executors} factory methods {@link 28: * Executors#newCachedThreadPool} (unbounded thread pool, with 29: * automatic thread reclamation), {@link Executors#newFixedThreadPool} 30: * (fixed size thread pool) and {@link 31: * Executors#newSingleThreadExecutor} (single background thread), that 32: * preconfigure settings for the most common usage 33: * scenarios. Otherwise, use the following guide when manually 34: * configuring and tuning this class: 35: * 36: * <dl> 37: * 38: * <dt>Core and maximum pool sizes</dt> 39: * 40: * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the 41: * pool size 42: * (see {@link ThreadPoolExecutor#getPoolSize}) 43: * according to the bounds set by corePoolSize 44: * (see {@link ThreadPoolExecutor#getCorePoolSize}) 45: * and 46: * maximumPoolSize 47: * (see {@link ThreadPoolExecutor#getMaximumPoolSize}). 48: * When a new task is submitted in method {@link 49: * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads 50: * are running, a new thread is created to handle the request, even if 51: * other worker threads are idle. If there are more than 52: * corePoolSize but less than maximumPoolSize threads running, a new 53: * thread will be created only if the queue is full. By setting 54: * corePoolSize and maximumPoolSize the same, you create a fixed-size 55: * thread pool. By setting maximumPoolSize to an essentially unbounded 56: * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to 57: * accommodate an arbitrary number of concurrent tasks. Most typically, 58: * core and maximum pool sizes are set only upon construction, but they 59: * may also be changed dynamically using {@link 60: * ThreadPoolExecutor#setCorePoolSize} and {@link 61: * ThreadPoolExecutor#setMaximumPoolSize}. <dd> 62: * 63: * <dt> On-demand construction 64: * 65: * <dd> By default, even core threads are initially created and 66: * started only when new tasks arrive, but this can be overridden 67: * dynamically using method {@link 68: * ThreadPoolExecutor#prestartCoreThread} or 69: * {@link ThreadPoolExecutor#prestartAllCoreThreads}. 70: * You probably want to prestart threads if you construct the 71: * pool with a non-empty queue. </dd> 72: * 73: * <dt>Creating new threads</dt> 74: * 75: * <dd>New threads are created using a {@link 76: * java.util.concurrent.ThreadFactory}. If not otherwise specified, a 77: * {@link Executors#defaultThreadFactory} is used, that creates threads to all 78: * be in the same {@link ThreadGroup} and with the same 79: * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying 80: * a different ThreadFactory, you can alter the thread's name, thread 81: * group, priority, daemon status, etc. If a <tt>ThreadFactory</tt> fails to create 82: * a thread when asked by returning null from <tt>newThread</tt>, 83: * the executor will continue, but might 84: * not be able to execute any tasks. </dd> 85: * 86: * <dt>Keep-alive times</dt> 87: * 88: * <dd>If the pool currently has more than corePoolSize threads, 89: * excess threads will be terminated if they have been idle for more 90: * than the keepAliveTime (see {@link 91: * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of 92: * reducing resource consumption when the pool is not being actively 93: * used. If the pool becomes more active later, new threads will be 94: * constructed. This parameter can also be changed dynamically using 95: * method {@link ThreadPoolExecutor#setKeepAliveTime}. Using a value 96: * of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS} effectively 97: * disables idle threads from ever terminating prior to shut down. By 98: * default, the keep-alive policy applies only when there are more 99: * than corePoolSizeThreads. But method {@link 100: * ThreadPoolExecutor#allowCoreThreadTimeOut} can be used to apply 101: * this time-out policy to core threads as well, so long as 102: * the keepAliveTime value is non-zero. </dd> 103: * 104: * <dt>Queuing</dt> 105: * 106: * <dd>Any {@link BlockingQueue} may be used to transfer and hold 107: * submitted tasks. The use of this queue interacts with pool sizing: 108: * 109: * <ul> 110: * 111: * <li> If fewer than corePoolSize threads are running, the Executor 112: * always prefers adding a new thread 113: * rather than queuing.</li> 114: * 115: * <li> If corePoolSize or more threads are running, the Executor 116: * always prefers queuing a request rather than adding a new 117: * thread.</li> 118: * 119: * <li> If a request cannot be queued, a new thread is created unless 120: * this would exceed maximumPoolSize, in which case, the task will be 121: * rejected.</li> 122: * 123: * </ul> 124: * 125: * There are three general strategies for queuing: 126: * <ol> 127: * 128: * <li> <em> Direct handoffs.</em> A good default choice for a work 129: * queue is a {@link SynchronousQueue} that hands off tasks to threads 130: * without otherwise holding them. Here, an attempt to queue a task 131: * will fail if no threads are immediately available to run it, so a 132: * new thread will be constructed. This policy avoids lockups when 133: * handling sets of requests that might have internal dependencies. 134: * Direct handoffs generally require unbounded maximumPoolSizes to 135: * avoid rejection of new submitted tasks. This in turn admits the 136: * possibility of unbounded thread growth when commands continue to 137: * arrive on average faster than they can be processed. </li> 138: * 139: * <li><em> Unbounded queues.</em> Using an unbounded queue (for 140: * example a {@link LinkedBlockingQueue} without a predefined 141: * capacity) will cause new tasks to wait in the queue when all 142: * corePoolSize threads are busy. Thus, no more than corePoolSize 143: * threads will ever be created. (And the value of the maximumPoolSize 144: * therefore doesn't have any effect.) This may be appropriate when 145: * each task is completely independent of others, so tasks cannot 146: * affect each others execution; for example, in a web page server. 147: * While this style of queuing can be useful in smoothing out 148: * transient bursts of requests, it admits the possibility of 149: * unbounded work queue growth when commands continue to arrive on 150: * average faster than they can be processed. </li> 151: * 152: * <li><em>Bounded queues.</em> A bounded queue (for example, an 153: * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 154: * used with finite maximumPoolSizes, but can be more difficult to 155: * tune and control. Queue sizes and maximum pool sizes may be traded 156: * off for each other: Using large queues and small pools minimizes 157: * CPU usage, OS resources, and context-switching overhead, but can 158: * lead to artificially low throughput. If tasks frequently block (for 159: * example if they are I/O bound), a system may be able to schedule 160: * time for more threads than you otherwise allow. Use of small queues 161: * generally requires larger pool sizes, which keeps CPUs busier but 162: * may encounter unacceptable scheduling overhead, which also 163: * decreases throughput. </li> 164: * 165: * </ol> 166: * 167: * </dd> 168: * 169: * <dt>Rejected tasks</dt> 170: * 171: * <dd> New tasks submitted in method {@link 172: * ThreadPoolExecutor#execute} will be <em>rejected</em> when the 173: * Executor has been shut down, and also when the Executor uses finite 174: * bounds for both maximum threads and work queue capacity, and is 175: * saturated. In either case, the <tt>execute</tt> method invokes the 176: * {@link RejectedExecutionHandler#rejectedExecution} method of its 177: * {@link RejectedExecutionHandler}. Four predefined handler policies 178: * are provided: 179: * 180: * <ol> 181: * 182: * <li> In the 183: * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a 184: * runtime {@link RejectedExecutionException} upon rejection. </li> 185: * 186: * <li> In {@link 187: * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes 188: * <tt>execute</tt> itself runs the task. This provides a simple 189: * feedback control mechanism that will slow down the rate that new 190: * tasks are submitted. </li> 191: * 192: * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, 193: * a task that cannot be executed is simply dropped. </li> 194: * 195: * <li>In {@link 196: * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not 197: * shut down, the task at the head of the work queue is dropped, and 198: * then execution is retried (which can fail again, causing this to be 199: * repeated.) </li> 200: * 201: * </ol> 202: * 203: * It is possible to define and use other kinds of {@link 204: * RejectedExecutionHandler} classes. Doing so requires some care 205: * especially when policies are designed to work only under particular 206: * capacity or queuing policies. </dd> 207: * 208: * <dt>Hook methods</dt> 209: * 210: * <dd>This class provides <tt>protected</tt> overridable {@link 211: * ThreadPoolExecutor#beforeExecute} and {@link 212: * ThreadPoolExecutor#afterExecute} methods that are called before and 213: * after execution of each task. These can be used to manipulate the 214: * execution environment; for example, reinitializing ThreadLocals, 215: * gathering statistics, or adding log entries. Additionally, method 216: * {@link ThreadPoolExecutor#terminated} can be overridden to perform 217: * any special processing that needs to be done once the Executor has 218: * fully terminated. 219: * 220: * <p>If hook or callback methods throw 221: * exceptions, internal worker threads may in turn fail and 222: * abruptly terminate.</dd> 223: * 224: * <dt>Queue maintenance</dt> 225: * 226: * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to 227: * the work queue for purposes of monitoring and debugging. Use of 228: * this method for any other purpose is strongly discouraged. Two 229: * supplied methods, {@link ThreadPoolExecutor#remove} and {@link 230: * ThreadPoolExecutor#purge} are available to assist in storage 231: * reclamation when large numbers of queued tasks become 232: * cancelled.</dd> 233: * 234: * <dt>Finalization</dt> 235: * 236: * <dd> A pool that is no longer referenced in a program <em>AND</em> 237: * has no remaining threads will be <tt>shutdown</tt> 238: * automatically. If you would like to ensure that unreferenced pools 239: * are reclaimed even if users forget to call {@link 240: * ThreadPoolExecutor#shutdown}, then you must arrange that unused 241: * threads eventually die, by setting appropriate keep-alive times, 242: * using a lower bound of zero core threads and/or setting {@link 243: * ThreadPoolExecutor#allowCoreThreadTimeOut}. </dd> </dl> 244: * 245: * <p> <b>Extension example</b>. Most extensions of this class 246: * override one or more of the protected hook methods. For example, 247: * here is a subclass that adds a simple pause/resume feature: 248: * 249: * <pre> 250: * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 251: * private boolean isPaused; 252: * private ReentrantLock pauseLock = new ReentrantLock(); 253: * private Condition unpaused = pauseLock.newCondition(); 254: * 255: * public PausableThreadPoolExecutor(...) { super(...); } 256: * 257: * protected void beforeExecute(Thread t, Runnable r) { 258: * super.beforeExecute(t, r); 259: * pauseLock.lock(); 260: * try { 261: * while (isPaused) unpaused.await(); 262: * } catch (InterruptedException ie) { 263: * t.interrupt(); 264: * } finally { 265: * pauseLock.unlock(); 266: * } 267: * } 268: * 269: * public void pause() { 270: * pauseLock.lock(); 271: * try { 272: * isPaused = true; 273: * } finally { 274: * pauseLock.unlock(); 275: * } 276: * } 277: * 278: * public void resume() { 279: * pauseLock.lock(); 280: * try { 281: * isPaused = false; 282: * unpaused.signalAll(); 283: * } finally { 284: * pauseLock.unlock(); 285: * } 286: * } 287: * } 288: * </pre> 289: * @since 1.5 290: * @author Doug Lea 291: */ 292: public class ThreadPoolExecutor extends AbstractExecutorService { 293: /** 294: * Only used to force toArray() to produce a Runnable[]. 295: */ 296: private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0]; 297: 298: /** 299: * Permission for checking shutdown 300: */ 301: private static final RuntimePermission shutdownPerm = 302: new RuntimePermission("modifyThread"); 303: 304: /** 305: * Queue used for holding tasks and handing off to worker threads. 306: */ 307: private final BlockingQueue<Runnable> workQueue; 308: 309: /** 310: * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and 311: * workers set. 312: */ 313: private final ReentrantLock mainLock = new ReentrantLock(); 314: 315: /** 316: * Wait condition to support awaitTermination 317: */ 318: private final Condition termination = mainLock.newCondition(); 319: 320: /** 321: * Set containing all worker threads in pool. 322: */ 323: private final HashSet<Worker> workers = new HashSet<Worker>(); 324: 325: /** 326: * Timeout in nanoseconds for idle threads waiting for work. 327: * Threads use this timeout only when there are more than 328: * corePoolSize present. Otherwise they wait forever for new work. 329: */ 330: private volatile long keepAliveTime; 331: 332: /** 333: * If false (default) core threads stay alive even when idle. 334: * If true, core threads use keepAliveTime to time out waiting for work. 335: */ 336: private volatile boolean allowCoreThreadTimeOut; 337: 338: /** 339: * Core pool size, updated only while holding mainLock, 340: * but volatile to allow concurrent readability even 341: * during updates. 342: */ 343: private volatile int corePoolSize; 344: 345: /** 346: * Maximum pool size, updated only while holding mainLock 347: * but volatile to allow concurrent readability even 348: * during updates. 349: */ 350: private volatile int maximumPoolSize; 351: 352: /** 353: * Current pool size, updated only while holding mainLock 354: * but volatile to allow concurrent readability even 355: * during updates. 356: */ 357: private volatile int poolSize; 358: 359: /** 360: * Lifecycle state 361: */ 362: volatile int runState; 363: 364: // Special values for runState 365: /** Normal, not-shutdown mode */ 366: static final int RUNNING = 0; 367: /** Controlled shutdown mode */ 368: static final int SHUTDOWN = 1; 369: /** Immediate shutdown mode */ 370: static final int STOP = 2; 371: /** Final state */ 372: static final int TERMINATED = 3; 373: 374: /** 375: * Handler called when saturated or shutdown in execute. 376: */ 377: private volatile RejectedExecutionHandler handler; 378: 379: /** 380: * Factory for new threads. 381: */ 382: private volatile ThreadFactory threadFactory; 383: 384: /** 385: * Tracks largest attained pool size. 386: */ 387: private int largestPoolSize; 388: 389: /** 390: * Counter for completed tasks. Updated only on termination of 391: * worker threads. 392: */ 393: private long completedTaskCount; 394: 395: /** 396: * The default rejected execution handler 397: */ 398: private static final RejectedExecutionHandler defaultHandler = 399: new AbortPolicy(); 400: 401: /** 402: * Invokes the rejected execution handler for the given command. 403: */ 404: void reject(Runnable command) { 405: handler.rejectedExecution(command, this); 406: } 407: 408: /** 409: * Creates and returns a new thread running firstTask as its first 410: * task. Call only while holding mainLock. 411: * @param firstTask the task the new thread should run first (or 412: * null if none) 413: * @return the new thread, or null if threadFactory fails to create thread 414: */ 415: private Thread addThread(Runnable firstTask) { 416: if (runState == TERMINATED) // Don't create thread if terminated 417: return null; 418: Worker w = new Worker(firstTask); 419: Thread t = threadFactory.newThread(w); 420: if (t != null) { 421: w.thread = t; 422: workers.add(w); 423: int nt = ++poolSize; 424: if (nt > largestPoolSize) 425: largestPoolSize = nt; 426: } 427: return t; 428: } 429: 430: /** 431: * Creates and starts a new thread running firstTask as its first 432: * task, only if fewer than corePoolSize threads are running. 433: * @param firstTask the task the new thread should run first (or 434: * null if none) 435: * @return true if successful. 436: */ 437: private boolean addIfUnderCorePoolSize(Runnable firstTask) { 438: Thread t = null; 439: final ReentrantLock mainLock = this.mainLock; 440: mainLock.lock(); 441: try { 442: if (poolSize < corePoolSize) 443: t = addThread(firstTask); 444: } finally { 445: mainLock.unlock(); 446: } 447: if (t == null) 448: return false; 449: t.start(); 450: return true; 451: } 452: 453: /** 454: * Creates and starts a new thread only if fewer than maximumPoolSize 455: * threads are running. The new thread runs as its first task the 456: * next task in queue, or if there is none, the given task. 457: * @param firstTask the task the new thread should run first (or 458: * null if none) 459: * @return 0 if a new thread cannot be created, a positive number 460: * if firstTask will be run in a new thread, or a negative number 461: * if a new thread was created but is running some other task, in 462: * which case the caller must try some other way to run firstTask 463: * (perhaps by calling this method again). 464: */ 465: private int addIfUnderMaximumPoolSize(Runnable firstTask) { 466: Thread t = null; 467: int status = 0; 468: final ReentrantLock mainLock = this.mainLock; 469: mainLock.lock(); 470: try { 471: if (poolSize < maximumPoolSize) { 472: Runnable next = workQueue.poll(); 473: if (next == null) { 474: next = firstTask; 475: status = 1; 476: } else 477: status = -1; 478: t = addThread(next); 479: } 480: } finally { 481: mainLock.unlock(); 482: } 483: if (t == null) 484: return 0; 485: t.start(); 486: return status; 487: } 488: 489: 490: /** 491: * Gets the next task for a worker thread to run. 492: * @return the task 493: */ 494: Runnable getTask() { 495: for (;;) { 496: try { 497: switch (runState) { 498: case RUNNING: { 499: // untimed wait if core and not allowing core timeout 500: if (poolSize <= corePoolSize && !allowCoreThreadTimeOut) 501: return workQueue.take(); 502: 503: long timeout = keepAliveTime; 504: if (timeout <= 0) // die immediately for 0 timeout 505: return null; 506: Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 507: if (r != null) 508: return r; 509: if (poolSize > corePoolSize || allowCoreThreadTimeOut) 510: return null; // timed out 511: // Else, after timeout, the pool shrank. Retry 512: break; 513: } 514: 515: case SHUTDOWN: { 516: // Help drain queue 517: Runnable r = workQueue.poll(); 518: if (r != null) 519: return r; 520: 521: // Check if can terminate 522: if (workQueue.isEmpty()) { 523: interruptIdleWorkers(); 524: return null; 525: } 526: 527: // Else there could still be delayed tasks in queue. 528: return workQueue.take(); 529: } 530: 531: case STOP: 532: return null; 533: default: 534: assert false; 535: } 536: } catch (InterruptedException ie) { 537: // On interruption, re-check runstate 538: } 539: } 540: } 541: 542: /** 543: * Wakes up all threads that might be waiting for tasks. 544: */ 545: void interruptIdleWorkers() { 546: final ReentrantLock mainLock = this.mainLock; 547: mainLock.lock(); 548: try { 549: for (Worker w : workers) 550: w.interruptIfIdle(); 551: } finally { 552: mainLock.unlock(); 553: } 554: } 555: 556: /** 557: * Performs bookkeeping for a terminated worker thread. 558: * @param w the worker 559: */ 560: void workerDone(Worker w) { 561: final ReentrantLock mainLock = this.mainLock; 562: mainLock.lock(); 563: try { 564: completedTaskCount += w.completedTasks; 565: workers.remove(w); 566: if (--poolSize > 0) 567: return; 568: 569: // Else, this is the last thread. Deal with potential shutdown. 570: 571: int state = runState; 572: assert state != TERMINATED; 573: 574: if (state != STOP) { 575: // If there are queued tasks but no threads, create 576: // replacement thread. We must create it initially 577: // idle to avoid orphaned tasks in case addThread 578: // fails. This also handles case of delayed tasks 579: // that will sometime later become runnable. 580: if (!workQueue.isEmpty()) { 581: Thread t = addThread(null); 582: if (t != null) 583: t.start(); 584: return; 585: } 586: 587: // Otherwise, we can exit without replacement 588: if (state == RUNNING) 589: return; 590: } 591: 592: // Either state is STOP, or state is SHUTDOWN and there is 593: // no work to do. So we can terminate. 594: termination.signalAll(); 595: runState = TERMINATED; 596: // fall through to call terminate() outside of lock. 597: } finally { 598: mainLock.unlock(); 599: } 600: 601: assert runState == TERMINATED; 602: terminated(); 603: } 604: 605: /** 606: * Worker threads 607: */ 608: private class Worker implements Runnable { 609: 610: /** 611: * The runLock is acquired and released surrounding each task 612: * execution. It mainly protects against interrupts that are 613: * intended to cancel the worker thread from instead 614: * interrupting the task being run. 615: */ 616: private final ReentrantLock runLock = new ReentrantLock(); 617: 618: /** 619: * Initial task to run before entering run loop 620: */ 621: private Runnable firstTask; 622: 623: /** 624: * Per thread completed task counter; accumulated 625: * into completedTaskCount upon termination. 626: */ 627: volatile long completedTasks; 628: 629: /** 630: * Thread this worker is running in. Acts as a final field, 631: * but cannot be set until thread is created. 632: */ 633: Thread thread; 634: 635: Worker(Runnable firstTask) { 636: this.firstTask = firstTask; 637: } 638: 639: boolean isActive() { 640: return runLock.isLocked(); 641: } 642: 643: /** 644: * Interrupts thread if not running a task. 645: */ 646: void interruptIfIdle() { 647: final ReentrantLock runLock = this.runLock; 648: if (runLock.tryLock()) { 649: try { 650: thread.interrupt(); 651: } finally { 652: runLock.unlock(); 653: } 654: } 655: } 656: 657: /** 658: * Interrupts thread even if running a task. 659: */ 660: void interruptNow() { 661: thread.interrupt(); 662: } 663: 664: /** 665: * Runs a single task between before/after methods. 666: */ 667: private void runTask(Runnable task) { 668: final ReentrantLock runLock = this.runLock; 669: runLock.lock(); 670: try { 671: // If not shutting down then clear an outstanding interrupt. 672: if (runState != STOP && 673: Thread.interrupted() && 674: runState == STOP) // Re-interrupt if stopped after clearing 675: thread.interrupt(); 676: boolean ran = false; 677: beforeExecute(thread, task); 678: try { 679: task.run(); 680: ran = true; 681: afterExecute(task, null); 682: ++completedTasks; 683: } catch (RuntimeException ex) { 684: if (!ran) 685: afterExecute(task, ex); 686: // Else the exception occurred within 687: // afterExecute itself in which case we don't 688: // want to call it again. 689: throw ex; 690: } 691: } finally { 692: runLock.unlock(); 693: } 694: } 695: 696: /** 697: * Main run loop 698: */ 699: public void run() { 700: try { 701: Runnable task = firstTask; 702: firstTask = null; 703: while (task != null || (task = getTask()) != null) { 704: runTask(task); 705: task = null; // unnecessary but can help GC 706: } 707: } finally { 708: workerDone(this); 709: } 710: } 711: } 712: 713: // Public methods 714: 715: /** 716: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 717: * parameters and default thread factory and rejected execution handler. 718: * It may be more convenient to use one of the {@link Executors} factory 719: * methods instead of this general purpose constructor. 720: * 721: * @param corePoolSize the number of threads to keep in the 722: * pool, even if they are idle. 723: * @param maximumPoolSize the maximum number of threads to allow in the 724: * pool. 725: * @param keepAliveTime when the number of threads is greater than 726: * the core, this is the maximum time that excess idle threads 727: * will wait for new tasks before terminating. 728: * @param unit the time unit for the keepAliveTime 729: * argument. 730: * @param workQueue the queue to use for holding tasks before they 731: * are executed. This queue will hold only the <tt>Runnable</tt> 732: * tasks submitted by the <tt>execute</tt> method. 733: * @throws IllegalArgumentException if corePoolSize, or 734: * keepAliveTime less than zero, or if maximumPoolSize less than or 735: * equal to zero, or if corePoolSize greater than maximumPoolSize. 736: * @throws NullPointerException if <tt>workQueue</tt> is null 737: */ 738: public ThreadPoolExecutor(int corePoolSize, 739: int maximumPoolSize, 740: long keepAliveTime, 741: TimeUnit unit, 742: BlockingQueue<Runnable> workQueue) { 743: this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 744: Executors.defaultThreadFactory(), defaultHandler); 745: } 746: 747: /** 748: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 749: * parameters and default rejected execution handler. 750: * 751: * @param corePoolSize the number of threads to keep in the 752: * pool, even if they are idle. 753: * @param maximumPoolSize the maximum number of threads to allow in the 754: * pool. 755: * @param keepAliveTime when the number of threads is greater than 756: * the core, this is the maximum time that excess idle threads 757: * will wait for new tasks before terminating. 758: * @param unit the time unit for the keepAliveTime 759: * argument. 760: * @param workQueue the queue to use for holding tasks before they 761: * are executed. This queue will hold only the <tt>Runnable</tt> 762: * tasks submitted by the <tt>execute</tt> method. 763: * @param threadFactory the factory to use when the executor 764: * creates a new thread. 765: * @throws IllegalArgumentException if corePoolSize, or 766: * keepAliveTime less than zero, or if maximumPoolSize less than or 767: * equal to zero, or if corePoolSize greater than maximumPoolSize. 768: * @throws NullPointerException if <tt>workQueue</tt> 769: * or <tt>threadFactory</tt> are null. 770: */ 771: public ThreadPoolExecutor(int corePoolSize, 772: int maximumPoolSize, 773: long keepAliveTime, 774: TimeUnit unit, 775: BlockingQueue<Runnable> workQueue, 776: ThreadFactory threadFactory) { 777: this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 778: threadFactory, defaultHandler); 779: } 780: 781: /** 782: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 783: * parameters and default thread factory. 784: * 785: * @param corePoolSize the number of threads to keep in the 786: * pool, even if they are idle. 787: * @param maximumPoolSize the maximum number of threads to allow in the 788: * pool. 789: * @param keepAliveTime when the number of threads is greater than 790: * the core, this is the maximum time that excess idle threads 791: * will wait for new tasks before terminating. 792: * @param unit the time unit for the keepAliveTime 793: * argument. 794: * @param workQueue the queue to use for holding tasks before they 795: * are executed. This queue will hold only the <tt>Runnable</tt> 796: * tasks submitted by the <tt>execute</tt> method. 797: * @param handler the handler to use when execution is blocked 798: * because the thread bounds and queue capacities are reached. 799: * @throws IllegalArgumentException if corePoolSize, or 800: * keepAliveTime less than zero, or if maximumPoolSize less than or 801: * equal to zero, or if corePoolSize greater than maximumPoolSize. 802: * @throws NullPointerException if <tt>workQueue</tt> 803: * or <tt>handler</tt> are null. 804: */ 805: public ThreadPoolExecutor(int corePoolSize, 806: int maximumPoolSize, 807: long keepAliveTime, 808: TimeUnit unit, 809: BlockingQueue<Runnable> workQueue, 810: RejectedExecutionHandler handler) { 811: this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 812: Executors.defaultThreadFactory(), handler); 813: } 814: 815: /** 816: * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 817: * parameters. 818: * 819: * @param corePoolSize the number of threads to keep in the 820: * pool, even if they are idle. 821: * @param maximumPoolSize the maximum number of threads to allow in the 822: * pool. 823: * @param keepAliveTime when the number of threads is greater than 824: * the core, this is the maximum time that excess idle threads 825: * will wait for new tasks before terminating. 826: * @param unit the time unit for the keepAliveTime 827: * argument. 828: * @param workQueue the queue to use for holding tasks before they 829: * are executed. This queue will hold only the <tt>Runnable</tt> 830: * tasks submitted by the <tt>execute</tt> method. 831: * @param threadFactory the factory to use when the executor 832: * creates a new thread. 833: * @param handler the handler to use when execution is blocked 834: * because the thread bounds and queue capacities are reached. 835: * @throws IllegalArgumentException if corePoolSize, or 836: * keepAliveTime less than zero, or if maximumPoolSize less than or 837: * equal to zero, or if corePoolSize greater than maximumPoolSize. 838: * @throws NullPointerException if <tt>workQueue</tt> 839: * or <tt>threadFactory</tt> or <tt>handler</tt> are null. 840: */ 841: public ThreadPoolExecutor(int corePoolSize, 842: int maximumPoolSize, 843: long keepAliveTime, 844: TimeUnit unit, 845: BlockingQueue<Runnable> workQueue, 846: ThreadFactory threadFactory, 847: RejectedExecutionHandler handler) { 848: if (corePoolSize < 0 || 849: maximumPoolSize <= 0 || 850: maximumPoolSize < corePoolSize || 851: keepAliveTime < 0) 852: throw new IllegalArgumentException(); 853: if (workQueue == null || threadFactory == null || handler == null) 854: throw new NullPointerException(); 855: this.corePoolSize = corePoolSize; 856: this.maximumPoolSize = maximumPoolSize; 857: this.workQueue = workQueue; 858: this.keepAliveTime = unit.toNanos(keepAliveTime); 859: this.threadFactory = threadFactory; 860: this.handler = handler; 861: } 862: 863: 864: /** 865: * Executes the given task sometime in the future. The task 866: * may execute in a new thread or in an existing pooled thread. 867: * 868: * If the task cannot be submitted for execution, either because this 869: * executor has been shutdown or because its capacity has been reached, 870: * the task is handled by the current <tt>RejectedExecutionHandler</tt>. 871: * 872: * @param command the task to execute 873: * @throws RejectedExecutionException at discretion of 874: * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted 875: * for execution 876: * @throws NullPointerException if command is null 877: */ 878: public void execute(Runnable command) { 879: if (command == null) 880: throw new NullPointerException(); 881: for (;;) { 882: if (runState != RUNNING) { 883: reject(command); 884: return; 885: } 886: if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) 887: return; 888: if (workQueue.offer(command)) 889: return; 890: int status = addIfUnderMaximumPoolSize(command); 891: if (status > 0) // created new thread 892: return; 893: if (status == 0) { // failed to create thread 894: reject(command); 895: return; 896: } 897: // Retry if created a new thread but it is busy with another task 898: } 899: } 900: 901: /** 902: * Initiates an orderly shutdown in which previously submitted 903: * tasks are executed, but no new tasks will be 904: * accepted. Invocation has no additional effect if already shut 905: * down. 906: * @throws SecurityException if a security manager exists and 907: * shutting down this ExecutorService may manipulate threads that 908: * the caller is not permitted to modify because it does not hold 909: * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 910: * or the security manager's <tt>checkAccess</tt> method denies access. 911: */ 912: public void shutdown() { 913: // Fail if caller doesn't have modifyThread permission. 914: SecurityManager security = System.getSecurityManager(); 915: if (security != null) 916: security.checkPermission(shutdownPerm); 917: 918: boolean fullyTerminated = false; 919: final ReentrantLock mainLock = this.mainLock; 920: mainLock.lock(); 921: try { 922: if (workers.size() > 0) { 923: // Check if caller can modify worker threads. This 924: // might not be true even if passed above check, if 925: // the SecurityManager treats some threads specially. 926: if (security != null) { 927: for (Worker w: workers) 928: security.checkAccess(w.thread); 929: } 930: 931: int state = runState; 932: if (state == RUNNING) // don't override shutdownNow 933: runState = SHUTDOWN; 934: 935: try { 936: for (Worker w: workers) 937: w.interruptIfIdle(); 938: } catch (SecurityException se) { 939: // If SecurityManager allows above checks, but 940: // then unexpectedly throws exception when 941: // interrupting threads (which it ought not do), 942: // back out as cleanly as we can. Some threads may 943: // have been killed but we remain in non-shutdown 944: // state. 945: runState = state; 946: throw se; 947: } 948: } 949: else { // If no workers, trigger full termination now 950: fullyTerminated = true; 951: runState = TERMINATED; 952: termination.signalAll(); 953: } 954: } finally { 955: mainLock.unlock(); 956: } 957: if (fullyTerminated) 958: terminated(); 959: } 960: 961: 962: /** 963: * Attempts to stop all actively executing tasks, halts the 964: * processing of waiting tasks, and returns a list of the tasks 965: * that were awaiting execution. 966: * 967: * <p>There are no guarantees beyond best-effort attempts to stop 968: * processing actively executing tasks. This implementation 969: * cancels tasks via {@link Thread#interrupt}, so any task that 970: * fails to respond to interrupts may never terminate. 971: * 972: * @return list of tasks that never commenced execution 973: * @throws SecurityException if a security manager exists and 974: * shutting down this ExecutorService may manipulate threads that 975: * the caller is not permitted to modify because it does not hold 976: * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 977: * or the security manager's <tt>checkAccess</tt> method denies access. 978: */ 979: public List<Runnable> shutdownNow() { 980: // Almost the same code as shutdown() 981: SecurityManager security = System.getSecurityManager(); 982: if (security != null) 983: security.checkPermission(shutdownPerm); 984: 985: boolean fullyTerminated = false; 986: final ReentrantLock mainLock = this.mainLock; 987: mainLock.lock(); 988: try { 989: if (workers.size() > 0) { 990: if (security != null) { 991: for (Worker w: workers) 992: security.checkAccess(w.thread); 993: } 994: 995: int state = runState; 996: if (state != TERMINATED) 997: runState = STOP; 998: try { 999: for (Worker w : workers) 1000: w.interruptNow(); 1001: } catch (SecurityException se) { 1002: runState = state; // back out; 1003: throw se; 1004: } 1005: } 1006: else { // If no workers, trigger full termination now 1007: fullyTerminated = true; 1008: runState = TERMINATED; 1009: termination.signalAll(); 1010: } 1011: } finally { 1012: mainLock.unlock(); 1013: } 1014: if (fullyTerminated) 1015: terminated(); 1016: return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY)); 1017: } 1018: 1019: public boolean isShutdown() { 1020: return runState != RUNNING; 1021: } 1022: 1023: /** 1024: * Returns true if this executor is in the process of terminating 1025: * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not 1026: * completely terminated. This method may be useful for 1027: * debugging. A return of <tt>true</tt> reported a sufficient 1028: * period after shutdown may indicate that submitted tasks have 1029: * ignored or suppressed interruption, causing this executor not 1030: * to properly terminate. 1031: * @return true if terminating but not yet terminated. 1032: */ 1033: public boolean isTerminating() { 1034: return runState == STOP; 1035: } 1036: 1037: public boolean isTerminated() { 1038: return runState == TERMINATED; 1039: } 1040: 1041: public boolean awaitTermination(long timeout, TimeUnit unit) 1042: throws InterruptedException { 1043: long nanos = unit.toNanos(timeout); 1044: final ReentrantLock mainLock = this.mainLock; 1045: mainLock.lock(); 1046: try { 1047: for (;;) { 1048: if (runState == TERMINATED) 1049: return true; 1050: if (nanos <= 0) 1051: return false; 1052: nanos = termination.awaitNanos(nanos); 1053: } 1054: } finally { 1055: mainLock.unlock(); 1056: } 1057: } 1058: 1059: /** 1060: * Invokes <tt>shutdown</tt> when this executor is no longer 1061: * referenced. 1062: */ 1063: protected void finalize() { 1064: shutdown(); 1065: } 1066: 1067: /** 1068: * Sets the thread factory used to create new threads. 1069: * 1070: * @param threadFactory the new thread factory 1071: * @throws NullPointerException if threadFactory is null 1072: * @see #getThreadFactory 1073: */ 1074: public void setThreadFactory(ThreadFactory threadFactory) { 1075: if (threadFactory == null) 1076: throw new NullPointerException(); 1077: this.threadFactory = threadFactory; 1078: } 1079: 1080: /** 1081: * Returns the thread factory used to create new threads. 1082: * 1083: * @return the current thread factory 1084: * @see #setThreadFactory 1085: */ 1086: public ThreadFactory getThreadFactory() { 1087: return threadFactory; 1088: } 1089: 1090: /** 1091: * Sets a new handler for unexecutable tasks. 1092: * 1093: * @param handler the new handler 1094: * @throws NullPointerException if handler is null 1095: * @see #getRejectedExecutionHandler 1096: */ 1097: public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1098: if (handler == null) 1099: throw new NullPointerException(); 1100: this.handler = handler; 1101: } 1102: 1103: /** 1104: * Returns the current handler for unexecutable tasks. 1105: * 1106: * @return the current handler 1107: * @see #setRejectedExecutionHandler 1108: */ 1109: public RejectedExecutionHandler getRejectedExecutionHandler() { 1110: return handler; 1111: } 1112: 1113: /** 1114: * Returns the task queue used by this executor. Access to the 1115: * task queue is intended primarily for debugging and monitoring. 1116: * This queue may be in active use. Retrieving the task queue 1117: * does not prevent queued tasks from executing. 1118: * 1119: * @return the task queue 1120: */ 1121: public BlockingQueue<Runnable> getQueue() { 1122: return workQueue; 1123: } 1124: 1125: /** 1126: * Removes this task from the executor's internal queue if it is 1127: * present, thus causing it not to be run if it has not already 1128: * started. 1129: * 1130: * <p> This method may be useful as one part of a cancellation 1131: * scheme. It may fail to remove tasks that have been converted 1132: * into other forms before being placed on the internal queue. For 1133: * example, a task entered using <tt>submit</tt> might be 1134: * converted into a form that maintains <tt>Future</tt> status. 1135: * However, in such cases, method {@link ThreadPoolExecutor#purge} 1136: * may be used to remove those Futures that have been cancelled. 1137: * 1138: * @param task the task to remove 1139: * @return true if the task was removed 1140: */ 1141: public boolean remove(Runnable task) { 1142: return getQueue().remove(task); 1143: } 1144: 1145: 1146: /** 1147: * Tries to remove from the work queue all {@link Future} 1148: * tasks that have been cancelled. This method can be useful as a 1149: * storage reclamation operation, that has no other impact on 1150: * functionality. Cancelled tasks are never executed, but may 1151: * accumulate in work queues until worker threads can actively 1152: * remove them. Invoking this method instead tries to remove them now. 1153: * However, this method may fail to remove tasks in 1154: * the presence of interference by other threads. 1155: */ 1156: public void purge() { 1157: // Fail if we encounter interference during traversal 1158: try { 1159: Iterator<Runnable> it = getQueue().iterator(); 1160: while (it.hasNext()) { 1161: Runnable r = it.next(); 1162: if (r instanceof Future<?>) { 1163: Future<?> c = (Future<?>)r; 1164: if (c.isCancelled()) 1165: it.remove(); 1166: } 1167: } 1168: } 1169: catch (ConcurrentModificationException ex) { 1170: return; 1171: } 1172: } 1173: 1174: /** 1175: * Sets the core number of threads. This overrides any value set 1176: * in the constructor. If the new value is smaller than the 1177: * current value, excess existing threads will be terminated when 1178: * they next become idle. If larger, new threads will, if needed, 1179: * be started to execute any queued tasks. 1180: * 1181: * @param corePoolSize the new core size 1182: * @throws IllegalArgumentException if <tt>corePoolSize</tt> 1183: * less than zero 1184: * @see #getCorePoolSize 1185: */ 1186: public void setCorePoolSize(int corePoolSize) { 1187: if (corePoolSize < 0) 1188: throw new IllegalArgumentException(); 1189: final ReentrantLock mainLock = this.mainLock; 1190: mainLock.lock(); 1191: try { 1192: int extra = this.corePoolSize - corePoolSize; 1193: this.corePoolSize = corePoolSize; 1194: if (extra < 0) { 1195: int n = workQueue.size(); 1196: // We have to create initially-idle threads here 1197: // because we otherwise have no recourse about 1198: // what to do with a dequeued task if addThread fails. 1199: while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) { 1200: Thread t = addThread(null); 1201: if (t != null) 1202: t.start(); 1203: else 1204: break; 1205: } 1206: } 1207: else if (extra > 0 && poolSize > corePoolSize) { 1208: Iterator<Worker> it = workers.iterator(); 1209: while (it.hasNext() && 1210: extra-- > 0 && 1211: poolSize > corePoolSize && 1212: workQueue.remainingCapacity() == 0) 1213: it.next().interruptIfIdle(); 1214: } 1215: } finally { 1216: mainLock.unlock(); 1217: } 1218: } 1219: 1220: /** 1221: * Returns the core number of threads. 1222: * 1223: * @return the core number of threads 1224: * @see #setCorePoolSize 1225: */ 1226: public int getCorePoolSize() { 1227: return corePoolSize; 1228: } 1229: 1230: /** 1231: * Starts a core thread, causing it to idly wait for work. This 1232: * overrides the default policy of starting core threads only when 1233: * new tasks are executed. This method will return <tt>false</tt> 1234: * if all core threads have already been started. 1235: * @return true if a thread was started 1236: */ 1237: public boolean prestartCoreThread() { 1238: return addIfUnderCorePoolSize(null); 1239: } 1240: 1241: /** 1242: * Starts all core threads, causing them to idly wait for work. This 1243: * overrides the default policy of starting core threads only when 1244: * new tasks are executed. 1245: * @return the number of threads started. 1246: */ 1247: public int prestartAllCoreThreads() { 1248: int n = 0; 1249: while (addIfUnderCorePoolSize(null)) 1250: ++n; 1251: return n; 1252: } 1253: 1254: /** 1255: * Returns true if this pool allows core threads to time out and 1256: * terminate if no tasks arrive within the keepAlive time, being 1257: * replaced if needed when new tasks arrive. When true, the same 1258: * keep-alive policy applying to non-core threads applies also to 1259: * core threads. When false (the default), core threads are never 1260: * terminated due to lack of incoming tasks. 1261: * @return <tt>true</tt> if core threads are allowed to time out, 1262: * else <tt>false</tt> 1263: * 1264: * @since 1.6 1265: */ 1266: public boolean allowsCoreThreadTimeOut() { 1267: return allowCoreThreadTimeOut; 1268: } 1269: 1270: /** 1271: * Sets the policy governing whether core threads may time out and 1272: * terminate if no tasks arrive within the keep-alive time, being 1273: * replaced if needed when new tasks arrive. When false, core 1274: * threads are never terminated due to lack of incoming 1275: * tasks. When true, the same keep-alive policy applying to 1276: * non-core threads applies also to core threads. To avoid 1277: * continual thread replacement, the keep-alive time must be 1278: * greater than zero when setting <tt>true</tt>. This method 1279: * should in general be called before the pool is actively used. 1280: * @param value <tt>true</tt> if should time out, else <tt>false</tt> 1281: * @throws IllegalArgumentException if value is <tt>true</tt> 1282: * and the current keep-alive time is not greater than zero. 1283: * 1284: * @since 1.6 1285: */ 1286: public void allowCoreThreadTimeOut(boolean value) { 1287: if (value && keepAliveTime <= 0) 1288: throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1289: 1290: allowCoreThreadTimeOut = value; 1291: } 1292: 1293: /** 1294: * Sets the maximum allowed number of threads. This overrides any 1295: * value set in the constructor. If the new value is smaller than 1296: * the current value, excess existing threads will be 1297: * terminated when they next become idle. 1298: * 1299: * @param maximumPoolSize the new maximum 1300: * @throws IllegalArgumentException if the new maximum is 1301: * less than or equal to zero, or 1302: * less than the {@linkplain #getCorePoolSize core pool size} 1303: * @see #getMaximumPoolSize 1304: */ 1305: public void setMaximumPoolSize(int maximumPoolSize) { 1306: if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1307: throw new IllegalArgumentException(); 1308: final ReentrantLock mainLock = this.mainLock; 1309: mainLock.lock(); 1310: try { 1311: int extra = this.maximumPoolSize - maximumPoolSize; 1312: this.maximumPoolSize = maximumPoolSize; 1313: if (extra > 0 && poolSize > maximumPoolSize) { 1314: Iterator<Worker> it = workers.iterator(); 1315: while (it.hasNext() && 1316: extra > 0 && 1317: poolSize > maximumPoolSize) { 1318: it.next().interruptIfIdle(); 1319: --extra; 1320: } 1321: } 1322: } finally { 1323: mainLock.unlock(); 1324: } 1325: } 1326: 1327: /** 1328: * Returns the maximum allowed number of threads. 1329: * 1330: * @return the maximum allowed number of threads 1331: * @see #setMaximumPoolSize 1332: */ 1333: public int getMaximumPoolSize() { 1334: return maximumPoolSize; 1335: } 1336: 1337: /** 1338: * Sets the time limit for which threads may remain idle before 1339: * being terminated. If there are more than the core number of 1340: * threads currently in the pool, after waiting this amount of 1341: * time without processing a task, excess threads will be 1342: * terminated. This overrides any value set in the constructor. 1343: * @param time the time to wait. A time value of zero will cause 1344: * excess threads to terminate immediately after executing tasks. 1345: * @param unit the time unit of the time argument 1346: * @throws IllegalArgumentException if time less than zero or 1347: * if time is zero and allowsCoreThreadTimeOut 1348: * @see #getKeepAliveTime 1349: */ 1350: public void setKeepAliveTime(long time, TimeUnit unit) { 1351: if (time < 0) 1352: throw new IllegalArgumentException(); 1353: if (time == 0 && allowsCoreThreadTimeOut()) 1354: throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1355: this.keepAliveTime = unit.toNanos(time); 1356: } 1357: 1358: /** 1359: * Returns the thread keep-alive time, which is the amount of time 1360: * which threads in excess of the core pool size may remain 1361: * idle before being terminated. 1362: * 1363: * @param unit the desired time unit of the result 1364: * @return the time limit 1365: * @see #setKeepAliveTime 1366: */ 1367: public long getKeepAliveTime(TimeUnit unit) { 1368: return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1369: } 1370: 1371: /* Statistics */ 1372: 1373: /** 1374: * Returns the current number of threads in the pool. 1375: * 1376: * @return the number of threads 1377: */ 1378: public int getPoolSize() { 1379: return poolSize; 1380: } 1381: 1382: /** 1383: * Returns the approximate number of threads that are actively 1384: * executing tasks. 1385: * 1386: * @return the number of threads 1387: */ 1388: public int getActiveCount() { 1389: final ReentrantLock mainLock = this.mainLock; 1390: mainLock.lock(); 1391: try { 1392: int n = 0; 1393: for (Worker w : workers) { 1394: if (w.isActive()) 1395: ++n; 1396: } 1397: return n; 1398: } finally { 1399: mainLock.unlock(); 1400: } 1401: } 1402: 1403: /** 1404: * Returns the largest number of threads that have ever 1405: * simultaneously been in the pool. 1406: * 1407: * @return the number of threads 1408: */ 1409: public int getLargestPoolSize() { 1410: final ReentrantLock mainLock = this.mainLock; 1411: mainLock.lock(); 1412: try { 1413: return largestPoolSize; 1414: } finally { 1415: mainLock.unlock(); 1416: } 1417: } 1418: 1419: /** 1420: * Returns the approximate total number of tasks that have been 1421: * scheduled for execution. Because the states of tasks and 1422: * threads may change dynamically during computation, the returned 1423: * value is only an approximation, but one that does not ever 1424: * decrease across successive calls. 1425: * 1426: * @return the number of tasks 1427: */ 1428: public long getTaskCount() { 1429: final ReentrantLock mainLock = this.mainLock; 1430: mainLock.lock(); 1431: try { 1432: long n = completedTaskCount; 1433: for (Worker w : workers) { 1434: n += w.completedTasks; 1435: if (w.isActive()) 1436: ++n; 1437: } 1438: return n + workQueue.size(); 1439: } finally { 1440: mainLock.unlock(); 1441: } 1442: } 1443: 1444: /** 1445: * Returns the approximate total number of tasks that have 1446: * completed execution. Because the states of tasks and threads 1447: * may change dynamically during computation, the returned value 1448: * is only an approximation, but one that does not ever decrease 1449: * across successive calls. 1450: * 1451: * @return the number of tasks 1452: */ 1453: public long getCompletedTaskCount() { 1454: final ReentrantLock mainLock = this.mainLock; 1455: mainLock.lock(); 1456: try { 1457: long n = completedTaskCount; 1458: for (Worker w : workers) 1459: n += w.completedTasks; 1460: return n; 1461: } finally { 1462: mainLock.unlock(); 1463: } 1464: } 1465: 1466: /** 1467: * Method invoked prior to executing the given Runnable in the 1468: * given thread. This method is invoked by thread <tt>t</tt> that 1469: * will execute task <tt>r</tt>, and may be used to re-initialize 1470: * ThreadLocals, or to perform logging. 1471: * 1472: * <p>This implementation does nothing, but may be customized in 1473: * subclasses. Note: To properly nest multiple overridings, subclasses 1474: * should generally invoke <tt>super.beforeExecute</tt> at the end of 1475: * this method. 1476: * 1477: * @param t the thread that will run task r. 1478: * @param r the task that will be executed. 1479: */ 1480: protected void beforeExecute(Thread t, Runnable r) { } 1481: 1482: /** 1483: * Method invoked upon completion of execution of the given Runnable. 1484: * This method is invoked by the thread that executed the task. If 1485: * non-null, the Throwable is the uncaught <tt>RuntimeException</tt> 1486: * or <tt>Error</tt> that caused execution to terminate abruptly. 1487: * 1488: * <p><b>Note:</b> When actions are enclosed in tasks (such as 1489: * {@link FutureTask}) either explicitly or via methods such as 1490: * <tt>submit</tt>, these task objects catch and maintain 1491: * computational exceptions, and so they do not cause abrupt 1492: * termination, and the internal exceptions are <em>not</em> 1493: * passed to this method. 1494: * 1495: * <p>This implementation does nothing, but may be customized in 1496: * subclasses. Note: To properly nest multiple overridings, subclasses 1497: * should generally invoke <tt>super.afterExecute</tt> at the 1498: * beginning of this method. 1499: * 1500: * @param r the runnable that has completed. 1501: * @param t the exception that caused termination, or null if 1502: * execution completed normally. 1503: */ 1504: protected void afterExecute(Runnable r, Throwable t) { } 1505: 1506: /** 1507: * Method invoked when the Executor has terminated. Default 1508: * implementation does nothing. Note: To properly nest multiple 1509: * overridings, subclasses should generally invoke 1510: * <tt>super.terminated</tt> within this method. 1511: */ 1512: protected void terminated() { } 1513: 1514: /** 1515: * A handler for rejected tasks that runs the rejected task 1516: * directly in the calling thread of the <tt>execute</tt> method, 1517: * unless the executor has been shut down, in which case the task 1518: * is discarded. 1519: */ 1520: public static class CallerRunsPolicy implements RejectedExecutionHandler { 1521: /** 1522: * Creates a <tt>CallerRunsPolicy</tt>. 1523: */ 1524: public CallerRunsPolicy() { } 1525: 1526: /** 1527: * Executes task r in the caller's thread, unless the executor 1528: * has been shut down, in which case the task is discarded. 1529: * @param r the runnable task requested to be executed 1530: * @param e the executor attempting to execute this task 1531: */ 1532: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1533: if (!e.isShutdown()) { 1534: r.run(); 1535: } 1536: } 1537: } 1538: 1539: /** 1540: * A handler for rejected tasks that throws a 1541: * <tt>RejectedExecutionException</tt>. 1542: */ 1543: public static class AbortPolicy implements RejectedExecutionHandler { 1544: /** 1545: * Creates an <tt>AbortPolicy</tt>. 1546: */ 1547: public AbortPolicy() { } 1548: 1549: /** 1550: * Always throws RejectedExecutionException. 1551: * @param r the runnable task requested to be executed 1552: * @param e the executor attempting to execute this task 1553: * @throws RejectedExecutionException always. 1554: */ 1555: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1556: throw new RejectedExecutionException(); 1557: } 1558: } 1559: 1560: /** 1561: * A handler for rejected tasks that silently discards the 1562: * rejected task. 1563: */ 1564: public static class DiscardPolicy implements RejectedExecutionHandler { 1565: /** 1566: * Creates a <tt>DiscardPolicy</tt>. 1567: */ 1568: public DiscardPolicy() { } 1569: 1570: /** 1571: * Does nothing, which has the effect of discarding task r. 1572: * @param r the runnable task requested to be executed 1573: * @param e the executor attempting to execute this task 1574: */ 1575: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1576: } 1577: } 1578: 1579: /** 1580: * A handler for rejected tasks that discards the oldest unhandled 1581: * request and then retries <tt>execute</tt>, unless the executor 1582: * is shut down, in which case the task is discarded. 1583: */ 1584: public static class DiscardOldestPolicy implements RejectedExecutionHandler { 1585: /** 1586: * Creates a <tt>DiscardOldestPolicy</tt> for the given executor. 1587: */ 1588: public DiscardOldestPolicy() { } 1589: 1590: /** 1591: * Obtains and ignores the next task that the executor 1592: * would otherwise execute, if one is immediately available, 1593: * and then retries execution of task r, unless the executor 1594: * is shut down, in which case task r is instead discarded. 1595: * @param r the runnable task requested to be executed 1596: * @param e the executor attempting to execute this task 1597: */ 1598: public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1599: if (!e.isShutdown()) { 1600: e.getQueue().poll(); 1601: e.execute(r); 1602: } 1603: } 1604: } 1605: }
GNU Classpath (0.98) |