GNU Classpath (0.98) | |
Frames | No Frames |
1: /* 2: * Written by Doug Lea with assistance from members of JCP JSR-166 3: * Expert Group and released to the public domain, as explained at 4: * http://creativecommons.org/licenses/publicdomain 5: */ 6: 7: package java.util.concurrent; 8: 9: /** 10: * A {@link CompletionService} that uses a supplied {@link Executor} 11: * to execute tasks. This class arranges that submitted tasks are, 12: * upon completion, placed on a queue accessible using <tt>take</tt>. 13: * The class is lightweight enough to be suitable for transient use 14: * when processing groups of tasks. 15: * 16: * <p> 17: * 18: * <b>Usage Examples.</b> 19: * 20: * Suppose you have a set of solvers for a certain problem, each 21: * returning a value of some type <tt>Result</tt>, and would like to 22: * run them concurrently, processing the results of each of them that 23: * return a non-null value, in some method <tt>use(Result r)</tt>. You 24: * could write this as: 25: * 26: * <pre> 27: * void solve(Executor e, 28: * Collection<Callable<Result>> solvers) 29: * throws InterruptedException, ExecutionException { 30: * CompletionService<Result> ecs 31: * = new ExecutorCompletionService<Result>(e); 32: * for (Callable<Result> s : solvers) 33: * ecs.submit(s); 34: * int n = solvers.size(); 35: * for (int i = 0; i < n; ++i) { 36: * Result r = ecs.take().get(); 37: * if (r != null) 38: * use(r); 39: * } 40: * } 41: * </pre> 42: * 43: * Suppose instead that you would like to use the first non-null result 44: * of the set of tasks, ignoring any that encounter exceptions, 45: * and cancelling all other tasks when the first one is ready: 46: * 47: * <pre> 48: * void solve(Executor e, 49: * Collection<Callable<Result>> solvers) 50: * throws InterruptedException { 51: * CompletionService<Result> ecs 52: * = new ExecutorCompletionService<Result>(e); 53: * int n = solvers.size(); 54: * List<Future<Result>> futures 55: * = new ArrayList<Future<Result>>(n); 56: * Result result = null; 57: * try { 58: * for (Callable<Result> s : solvers) 59: * futures.add(ecs.submit(s)); 60: * for (int i = 0; i < n; ++i) { 61: * try { 62: * Result r = ecs.take().get(); 63: * if (r != null) { 64: * result = r; 65: * break; 66: * } 67: * } catch (ExecutionException ignore) {} 68: * } 69: * } 70: * finally { 71: * for (Future<Result> f : futures) 72: * f.cancel(true); 73: * } 74: * 75: * if (result != null) 76: * use(result); 77: * } 78: * </pre> 79: */ 80: public class ExecutorCompletionService<V> implements CompletionService<V> { 81: private final Executor executor; 82: private final AbstractExecutorService aes; 83: private final BlockingQueue<Future<V>> completionQueue; 84: 85: /** 86: * FutureTask extension to enqueue upon completion 87: */ 88: private class QueueingFuture extends FutureTask<Void> { 89: QueueingFuture(RunnableFuture<V> task) { 90: super(task, null); 91: this.task = task; 92: } 93: protected void done() { completionQueue.add(task); } 94: private final Future<V> task; 95: } 96: 97: private RunnableFuture<V> newTaskFor(Callable<V> task) { 98: if (aes == null) 99: return new FutureTask<V>(task); 100: else 101: return aes.newTaskFor(task); 102: } 103: 104: private RunnableFuture<V> newTaskFor(Runnable task, V result) { 105: if (aes == null) 106: return new FutureTask<V>(task, result); 107: else 108: return aes.newTaskFor(task, result); 109: } 110: 111: /** 112: * Creates an ExecutorCompletionService using the supplied 113: * executor for base task execution and a 114: * {@link LinkedBlockingQueue} as a completion queue. 115: * 116: * @param executor the executor to use 117: * @throws NullPointerException if executor is <tt>null</tt> 118: */ 119: public ExecutorCompletionService(Executor executor) { 120: if (executor == null) 121: throw new NullPointerException(); 122: this.executor = executor; 123: this.aes = (executor instanceof AbstractExecutorService) ? 124: (AbstractExecutorService) executor : null; 125: this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 126: } 127: 128: /** 129: * Creates an ExecutorCompletionService using the supplied 130: * executor for base task execution and the supplied queue as its 131: * completion queue. 132: * 133: * @param executor the executor to use 134: * @param completionQueue the queue to use as the completion queue 135: * normally one dedicated for use by this service 136: * @throws NullPointerException if executor or completionQueue are <tt>null</tt> 137: */ 138: public ExecutorCompletionService(Executor executor, 139: BlockingQueue<Future<V>> completionQueue) { 140: if (executor == null || completionQueue == null) 141: throw new NullPointerException(); 142: this.executor = executor; 143: this.aes = (executor instanceof AbstractExecutorService) ? 144: (AbstractExecutorService) executor : null; 145: this.completionQueue = completionQueue; 146: } 147: 148: public Future<V> submit(Callable<V> task) { 149: if (task == null) throw new NullPointerException(); 150: RunnableFuture<V> f = newTaskFor(task); 151: executor.execute(new QueueingFuture(f)); 152: return f; 153: } 154: 155: public Future<V> submit(Runnable task, V result) { 156: if (task == null) throw new NullPointerException(); 157: RunnableFuture<V> f = newTaskFor(task, result); 158: executor.execute(new QueueingFuture(f)); 159: return f; 160: } 161: 162: public Future<V> take() throws InterruptedException { 163: return completionQueue.take(); 164: } 165: 166: public Future<V> poll() { 167: return completionQueue.poll(); 168: } 169: 170: public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { 171: return completionQueue.poll(timeout, unit); 172: } 173: 174: }
GNU Classpath (0.98) |