1:
6:
7: package ;
8: import ;
9:
10:
42: public abstract class AbstractExecutorService implements ExecutorService {
43:
44:
56: protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57: return new FutureTask<T>(runnable, value);
58: }
59:
60:
70: protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71: return new FutureTask<T>(callable);
72: }
73:
74: public Future<?> submit(Runnable task) {
75: if (task == null) throw new NullPointerException();
76: RunnableFuture<Object> ftask = newTaskFor(task, null);
77: execute(ftask);
78: return ftask;
79: }
80:
81: public <T> Future<T> submit(Runnable task, T result) {
82: if (task == null) throw new NullPointerException();
83: RunnableFuture<T> ftask = newTaskFor(task, result);
84: execute(ftask);
85: return ftask;
86: }
87:
88: public <T> Future<T> submit(Callable<T> task) {
89: if (task == null) throw new NullPointerException();
90: RunnableFuture<T> ftask = newTaskFor(task);
91: execute(ftask);
92: return ftask;
93: }
94:
95:
98: private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
99: boolean timed, long nanos)
100: throws InterruptedException, ExecutionException, TimeoutException {
101: if (tasks == null)
102: throw new NullPointerException();
103: int ntasks = tasks.size();
104: if (ntasks == 0)
105: throw new IllegalArgumentException();
106: List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
107: ExecutorCompletionService<T> ecs =
108: new ExecutorCompletionService<T>(this);
109:
110:
111:
112:
113:
114:
115:
116: try {
117:
118:
119: ExecutionException ee = null;
120: long lastTime = (timed)? System.nanoTime() : 0;
121: Iterator<? extends Callable<T>> it = tasks.iterator();
122:
123:
124: futures.add(ecs.submit(it.next()));
125: --ntasks;
126: int active = 1;
127:
128: for (;;) {
129: Future<T> f = ecs.poll();
130: if (f == null) {
131: if (ntasks > 0) {
132: --ntasks;
133: futures.add(ecs.submit(it.next()));
134: ++active;
135: }
136: else if (active == 0)
137: break;
138: else if (timed) {
139: f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
140: if (f == null)
141: throw new TimeoutException();
142: long now = System.nanoTime();
143: nanos -= now - lastTime;
144: lastTime = now;
145: }
146: else
147: f = ecs.take();
148: }
149: if (f != null) {
150: --active;
151: try {
152: return f.get();
153: } catch (InterruptedException ie) {
154: throw ie;
155: } catch (ExecutionException eex) {
156: ee = eex;
157: } catch (RuntimeException rex) {
158: ee = new ExecutionException(rex);
159: }
160: }
161: }
162:
163: if (ee == null)
164: ee = new ExecutionException();
165: throw ee;
166:
167: } finally {
168: for (Future<T> f : futures)
169: f.cancel(true);
170: }
171: }
172:
173: public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
174: throws InterruptedException, ExecutionException {
175: try {
176: return doInvokeAny(tasks, false, 0);
177: } catch (TimeoutException cannotHappen) {
178: assert false;
179: return null;
180: }
181: }
182:
183: public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
184: long timeout, TimeUnit unit)
185: throws InterruptedException, ExecutionException, TimeoutException {
186: return doInvokeAny(tasks, true, unit.toNanos(timeout));
187: }
188:
189: public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
190: throws InterruptedException {
191: if (tasks == null)
192: throw new NullPointerException();
193: List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
194: boolean done = false;
195: try {
196: for (Callable<T> t : tasks) {
197: RunnableFuture<T> f = newTaskFor(t);
198: futures.add(f);
199: execute(f);
200: }
201: for (Future<T> f : futures) {
202: if (!f.isDone()) {
203: try {
204: f.get();
205: } catch (CancellationException ignore) {
206: } catch (ExecutionException ignore) {
207: }
208: }
209: }
210: done = true;
211: return futures;
212: } finally {
213: if (!done)
214: for (Future<T> f : futures)
215: f.cancel(true);
216: }
217: }
218:
219: public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
220: long timeout, TimeUnit unit)
221: throws InterruptedException {
222: if (tasks == null || unit == null)
223: throw new NullPointerException();
224: long nanos = unit.toNanos(timeout);
225: List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
226: boolean done = false;
227: try {
228: for (Callable<T> t : tasks)
229: futures.add(newTaskFor(t));
230:
231: long lastTime = System.nanoTime();
232:
233:
234:
235: Iterator<Future<T>> it = futures.iterator();
236: while (it.hasNext()) {
237: execute((Runnable)(it.next()));
238: long now = System.nanoTime();
239: nanos -= now - lastTime;
240: lastTime = now;
241: if (nanos <= 0)
242: return futures;
243: }
244:
245: for (Future<T> f : futures) {
246: if (!f.isDone()) {
247: if (nanos <= 0)
248: return futures;
249: try {
250: f.get(nanos, TimeUnit.NANOSECONDS);
251: } catch (CancellationException ignore) {
252: } catch (ExecutionException ignore) {
253: } catch (TimeoutException toe) {
254: return futures;
255: }
256: long now = System.nanoTime();
257: nanos -= now - lastTime;
258: lastTime = now;
259: }
260: }
261: done = true;
262: return futures;
263: } finally {
264: if (!done)
265: for (Future<T> f : futures)
266: f.cancel(true);
267: }
268: }
269:
270: }