Source for java.util.concurrent.AbstractExecutorService

   1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent;
   8: import java.util.*;
   9: 
  10: /**
  11:  * Provides default implementations of {@link ExecutorService}
  12:  * execution methods. This class implements the <tt>submit</tt>,
  13:  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
  14:  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
  15:  * to the {@link FutureTask} class provided in this package.  For example,
  16:  * the implementation of <tt>submit(Runnable)</tt> creates an
  17:  * associated <tt>RunnableFuture</tt> that is executed and
  18:  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
  19:  * to return <tt>RunnableFuture</tt> implementations other than
  20:  * <tt>FutureTask</tt>.
  21:  *
  22:  * <p> <b>Extension example</b>. Here is a sketch of a class
  23:  * that customizes {@link ThreadPoolExecutor} to use
  24:  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
  25:  * <pre>
  26:  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  27:  *
  28:  *   static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
  29:  *
  30:  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
  31:  *       return new CustomTask&lt;V&gt;(c);
  32:  *   }
  33:  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
  34:  *       return new CustomTask&lt;V&gt;(r, v);
  35:  *   }
  36:  *   // ... add constructors, etc.
  37:  * }
  38:  * </pre>
  39:  * @since 1.5
  40:  * @author Doug Lea
  41:  */
  42: public abstract class AbstractExecutorService implements ExecutorService {
  43: 
  44:     /**
  45:      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
  46:      * value.
  47:      *
  48:      * @param runnable the runnable task being wrapped
  49:      * @param value the default value for the returned future
  50:      * @return a <tt>RunnableFuture</tt> which when run will run the
  51:      * underlying runnable and which, as a <tt>Future</tt>, will yield
  52:      * the given value as its result and provide for cancellation of
  53:      * the underlying task.
  54:      * @since 1.6
  55:      */
  56:     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  57:         return new FutureTask<T>(runnable, value);
  58:     }
  59: 
  60:     /**
  61:      * Returns a <tt>RunnableFuture</tt> for the given callable task.
  62:      *
  63:      * @param callable the callable task being wrapped
  64:      * @return a <tt>RunnableFuture</tt> which when run will call the
  65:      * underlying callable and which, as a <tt>Future</tt>, will yield
  66:      * the callable's result as its result and provide for
  67:      * cancellation of the underlying task.
  68:      * @since 1.6
  69:      */
  70:     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  71:         return new FutureTask<T>(callable);
  72:     }
  73: 
  74:     public Future<?> submit(Runnable task) {
  75:         if (task == null) throw new NullPointerException();
  76:         RunnableFuture<Object> ftask = newTaskFor(task, null);
  77:         execute(ftask);
  78:         return ftask;
  79:     }
  80: 
  81:     public <T> Future<T> submit(Runnable task, T result) {
  82:         if (task == null) throw new NullPointerException();
  83:         RunnableFuture<T> ftask = newTaskFor(task, result);
  84:         execute(ftask);
  85:         return ftask;
  86:     }
  87: 
  88:     public <T> Future<T> submit(Callable<T> task) {
  89:         if (task == null) throw new NullPointerException();
  90:         RunnableFuture<T> ftask = newTaskFor(task);
  91:         execute(ftask);
  92:         return ftask;
  93:     }
  94: 
  95:     /**
  96:      * the main mechanics of invokeAny.
  97:      */
  98:     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
  99:                             boolean timed, long nanos)
 100:         throws InterruptedException, ExecutionException, TimeoutException {
 101:         if (tasks == null)
 102:             throw new NullPointerException();
 103:         int ntasks = tasks.size();
 104:         if (ntasks == 0)
 105:             throw new IllegalArgumentException();
 106:         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
 107:         ExecutorCompletionService<T> ecs =
 108:             new ExecutorCompletionService<T>(this);
 109: 
 110:         // For efficiency, especially in executors with limited
 111:         // parallelism, check to see if previously submitted tasks are
 112:         // done before submitting more of them. This interleaving
 113:         // plus the exception mechanics account for messiness of main
 114:         // loop.
 115: 
 116:         try {
 117:             // Record exceptions so that if we fail to obtain any
 118:             // result, we can throw the last exception we got.
 119:             ExecutionException ee = null;
 120:             long lastTime = (timed)? System.nanoTime() : 0;
 121:             Iterator<? extends Callable<T>> it = tasks.iterator();
 122: 
 123:             // Start one task for sure; the rest incrementally
 124:             futures.add(ecs.submit(it.next()));
 125:             --ntasks;
 126:             int active = 1;
 127: 
 128:             for (;;) {
 129:                 Future<T> f = ecs.poll();
 130:                 if (f == null) {
 131:                     if (ntasks > 0) {
 132:                         --ntasks;
 133:                         futures.add(ecs.submit(it.next()));
 134:                         ++active;
 135:                     }
 136:                     else if (active == 0)
 137:                         break;
 138:                     else if (timed) {
 139:                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 140:                         if (f == null)
 141:                             throw new TimeoutException();
 142:                         long now = System.nanoTime();
 143:                         nanos -= now - lastTime;
 144:                         lastTime = now;
 145:                     }
 146:                     else
 147:                         f = ecs.take();
 148:                 }
 149:                 if (f != null) {
 150:                     --active;
 151:                     try {
 152:                         return f.get();
 153:                     } catch (InterruptedException ie) {
 154:                         throw ie;
 155:                     } catch (ExecutionException eex) {
 156:                         ee = eex;
 157:                     } catch (RuntimeException rex) {
 158:                         ee = new ExecutionException(rex);
 159:                     }
 160:                 }
 161:             }
 162: 
 163:             if (ee == null)
 164:                 ee = new ExecutionException();
 165:             throw ee;
 166: 
 167:         } finally {
 168:             for (Future<T> f : futures)
 169:                 f.cancel(true);
 170:         }
 171:     }
 172: 
 173:     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 174:         throws InterruptedException, ExecutionException {
 175:         try {
 176:             return doInvokeAny(tasks, false, 0);
 177:         } catch (TimeoutException cannotHappen) {
 178:             assert false;
 179:             return null;
 180:         }
 181:     }
 182: 
 183:     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
 184:                            long timeout, TimeUnit unit)
 185:         throws InterruptedException, ExecutionException, TimeoutException {
 186:         return doInvokeAny(tasks, true, unit.toNanos(timeout));
 187:     }
 188: 
 189:     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 190:         throws InterruptedException {
 191:         if (tasks == null)
 192:             throw new NullPointerException();
 193:         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
 194:         boolean done = false;
 195:         try {
 196:             for (Callable<T> t : tasks) {
 197:                 RunnableFuture<T> f = newTaskFor(t);
 198:                 futures.add(f);
 199:                 execute(f);
 200:             }
 201:             for (Future<T> f : futures) {
 202:                 if (!f.isDone()) {
 203:                     try {
 204:                         f.get();
 205:                     } catch (CancellationException ignore) {
 206:                     } catch (ExecutionException ignore) {
 207:                     }
 208:                 }
 209:             }
 210:             done = true;
 211:             return futures;
 212:         } finally {
 213:             if (!done)
 214:                 for (Future<T> f : futures)
 215:                     f.cancel(true);
 216:         }
 217:     }
 218: 
 219:     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 220:                                          long timeout, TimeUnit unit)
 221:         throws InterruptedException {
 222:         if (tasks == null || unit == null)
 223:             throw new NullPointerException();
 224:         long nanos = unit.toNanos(timeout);
 225:         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
 226:         boolean done = false;
 227:         try {
 228:             for (Callable<T> t : tasks)
 229:                 futures.add(newTaskFor(t));
 230: 
 231:             long lastTime = System.nanoTime();
 232: 
 233:             // Interleave time checks and calls to execute in case
 234:             // executor doesn't have any/much parallelism.
 235:             Iterator<Future<T>> it = futures.iterator();
 236:             while (it.hasNext()) {
 237:                 execute((Runnable)(it.next()));
 238:                 long now = System.nanoTime();
 239:                 nanos -= now - lastTime;
 240:                 lastTime = now;
 241:                 if (nanos <= 0)
 242:                     return futures;
 243:             }
 244: 
 245:             for (Future<T> f : futures) {
 246:                 if (!f.isDone()) {
 247:                     if (nanos <= 0)
 248:                         return futures;
 249:                     try {
 250:                         f.get(nanos, TimeUnit.NANOSECONDS);
 251:                     } catch (CancellationException ignore) {
 252:                     } catch (ExecutionException ignore) {
 253:                     } catch (TimeoutException toe) {
 254:                         return futures;
 255:                     }
 256:                     long now = System.nanoTime();
 257:                     nanos -= now - lastTime;
 258:                     lastTime = now;
 259:                 }
 260:             }
 261:             done = true;
 262:             return futures;
 263:         } finally {
 264:             if (!done)
 265:                 for (Future<T> f : futures)
 266:                     f.cancel(true);
 267:         }
 268:     }
 269: 
 270: }