EDU.oswego.cs.dl.util.concurrent

Class PooledExecutor

Implemented Interfaces:
Executor

public class PooledExecutor
extends ThreadFactoryUser
implements Executor

A tunable, extensible thread pool class. The main supported public method is execute(Runnable command), which can be called instead of directly creating threads to execute commands.

Thread pools can be useful for several, usually intertwined reasons:

These goals introduce a number of policy parameters that are encapsulated in this class. All of these parameters have defaults and are tunable, either via get/set methods, or, in cases where decisions should hold across lifetimes, via methods that can be easily overridden in subclasses. The main, most commonly set parameters can be established in constructors. Policy choices across these dimensions can and do interact. Be careful, and please read this documentation completely before using! See also the usage examples below.

executeInteger.MAX_VALUE

negative

execute

execute
Other plausible policies include raising the maximum pool size after checking with some other objects that this is OK.

These cases can never occur if the maximum pool size is unbounded or the queue is unbounded. In these cases you instead face potential resource exhaustion.) The execute method does not throw any checked exceptions in any of these cases since any errors associated with them must normally be dealt with via handlers or callbacks. (Although in some cases, these might be associated with throwing unchecked exceptions.) You may wish to add special implementations even if you choose one of the listed policies. For example, the supplied Discard policy does not inform the caller of the drop. You could add your own version that does so. Since choice of policies is normally a system-wide decision, selecting a policy affects all calls to execute. If for some reason you would instead like to make per-call decisions, you could add variant versions of the execute method (for example, executeIfWouldNotBlock) in subclasses.

Usage examples.

 class MyPool {
   // initialize to use a maximum of 8 threads.
   static PooledExecutor pool = new PooledExecutor(8);
 }
 
  1. Using a bounded buffer of 10 tasks, at least 4 threads (started only when needed due to incoming requests), but allowing up to 100 threads if the buffer gets full.
            pool = new PooledExecutor(new BoundedBuffer(10), 100);
            pool.setMinimumPoolSize(4);
         
  2. Same as (1), except pre-start 9 threads, allowing them to die if they are not used for five minutes.
            pool = new PooledExecutor(new BoundedBuffer(10), 100);
            pool.setMinimumPoolSize(4);
            pool.setKeepAliveTime(1000 * 60 * 5);
            pool.createThreads(9);
         
  3. Same as (2) except clients abort if both the buffer is full and all 100 threads are busy:
            pool = new PooledExecutor(new BoundedBuffer(10), 100);
            pool.setMinimumPoolSize(4);
            pool.setKeepAliveTime(1000 * 60 * 5);
            pool.abortWhenBlocked();
            pool.createThreads(9);
         
  4. An unbounded queue serviced by exactly 5 threads:
            pool = new PooledExecutor(new LinkedQueue());
            pool.setKeepAliveTime(-1); // live forever
            pool.createThreads(5);
         

Usage notes.

Introduction to this package.

Nested Class Summary

protected class
PooledExecutor.AbortWhenBlocked
Class defining Abort action.
static interface
PooledExecutor.BlockedExecutionHandler
Class for actions to take when execute() blocks.
protected class
PooledExecutor.DiscardOldestWhenBlocked
Class defining DiscardOldest action.
protected class
PooledExecutor.DiscardWhenBlocked
Class defining Discard action.
protected class
PooledExecutor.RunWhenBlocked
Class defining Run action.
protected class
PooledExecutor.WaitWhenBlocked
Class defining Wait action.
protected class
PooledExecutor.Worker
Class defining the basic run loop for pooled threads.

Nested classes/interfaces inherited from class EDU.oswego.cs.dl.util.concurrent.ThreadFactoryUser

ThreadFactoryUser.DefaultThreadFactory

Field Summary

static long
DEFAULT_KEEPALIVETIME
The maximum time to keep worker threads alive waiting for new tasks; used if not otherwise specified.
static int
DEFAULT_MAXIMUMPOOLSIZE
The maximum pool size; used if not otherwise specified.
static int
DEFAULT_MINIMUMPOOLSIZE
The minimum pool size; used if not otherwise specified.
protected PooledExecutor.BlockedExecutionHandler
blockedExecutionHandler_
The current handler for unserviceable requests.
protected Channel
handOff_
The channel used to hand off the command to a thread in the pool.
protected long
keepAliveTime_
The maximum time for an idle thread to wait for new task.
protected int
maximumPoolSize_
The maximum number of threads allowed in pool.
protected int
minimumPoolSize_
The minumum number of threads to maintain in pool.
protected int
poolSize_
Current pool size.
protected boolean
shutdown_
Shutdown flag - latches true when a shutdown method is called in order to disable queuing/handoffs of new tasks.
protected Map
threads_
The set of active threads, declared as a map from workers to their threads.

Fields inherited from class EDU.oswego.cs.dl.util.concurrent.ThreadFactoryUser

threadFactory_

Constructor Summary

PooledExecutor()
Create a new pool with all default settings
PooledExecutor(Channel channel)
Create a new pool that uses the supplied Channel for queuing, and with all default parameter settings.
PooledExecutor(Channel channel, int maxPoolSize)
Create a new pool that uses the supplied Channel for queuing, and with all default parameter settings except for maximum pool size.
PooledExecutor(int maxPoolSize)
Create a new pool with all default settings except for maximum pool size.

Method Summary

void
abortWhenBlocked()
Set the policy for blocked execution to be to throw a RuntimeException.
protected void
addThread(Runnable command)
Create and start a thread to handle a new command.
void
awaitTerminationAfterShutdown()
Wait for a shutdown pool to fully terminate.
boolean
awaitTerminationAfterShutdown(long maxWaitTime)
Wait for a shutdown pool to fully terminate, or until the timeout has expired.
int
createThreads(int numberOfThreads)
Create and start up to numberOfThreads threads in the pool.
void
discardOldestWhenBlocked()
Set the policy for blocked execution to be to discard the oldest unhandled request
void
discardWhenBlocked()
Set the policy for blocked execution to be to return without executing the request.
List
drain()
Remove all unprocessed tasks from pool queue, and return them in a java.util.List.
void
execute(Runnable command)
Arrange for the given command to be executed by a thread in this pool.
PooledExecutor.BlockedExecutionHandler
getBlockedExecutionHandler()
Get the handler for blocked execution *
long
getKeepAliveTime()
Return the number of milliseconds to keep threads alive waiting for new commands.
int
getMaximumPoolSize()
Return the maximum number of threads to simultaneously execute New unqueued requests will be handled according to the current blocking policy once this limit is exceeded.
int
getMinimumPoolSize()
Return the minimum number of threads to simultaneously execute.
int
getPoolSize()
Return the current number of active threads in the pool.
protected Runnable
getTask()
Get a task from the handoff queue, or null if shutting down.
void
interruptAll()
Interrupt all threads in the pool, causing them all to terminate.
boolean
isTerminatedAfterShutdown()
Return true if a shutDown method has succeeded in terminating all threads.
void
runWhenBlocked()
Set the policy for blocked execution to be that the current thread executes the command if there are no available threads in the pool.
void
setBlockedExecutionHandler(PooledExecutor.BlockedExecutionHandler h)
Set the handler for blocked execution *
void
setKeepAliveTime(long msecs)
Set the number of milliseconds to keep threads alive waiting for new commands.
void
setMaximumPoolSize(int newMaximum)
Set the maximum number of threads to use.
void
setMinimumPoolSize(int newMinimum)
Set the minimum number of threads to use.
void
shutdownAfterProcessingCurrentlyQueuedTasks()
Terminate threads after processing all elements currently in queue.
void
shutdownAfterProcessingCurrentlyQueuedTasks(PooledExecutor.BlockedExecutionHandler handler)
Terminate threads after processing all elements currently in queue.
void
shutdownNow()
Interrupt all threads and disable construction of new threads.
void
shutdownNow(PooledExecutor.BlockedExecutionHandler handler)
Interrupt all threads and disable construction of new threads.
void
waitWhenBlocked()
Set the policy for blocked execution to be to wait until a thread is available, unless the pool has been shut down, in which case the action is discarded.
protected void
workerDone(PooledExecutor.Worker w)
Cleanup method called upon termination of worker thread.

Methods inherited from class EDU.oswego.cs.dl.util.concurrent.ThreadFactoryUser

getThreadFactory, setThreadFactory

Field Details

DEFAULT_KEEPALIVETIME

public static final long DEFAULT_KEEPALIVETIME
The maximum time to keep worker threads alive waiting for new tasks; used if not otherwise specified. Default value is one minute (60000 milliseconds).
Field Value:
60000L

DEFAULT_MAXIMUMPOOLSIZE

public static final int DEFAULT_MAXIMUMPOOLSIZE
The maximum pool size; used if not otherwise specified. Default value is essentially infinite (Integer.MAX_VALUE)

DEFAULT_MINIMUMPOOLSIZE

public static final int DEFAULT_MINIMUMPOOLSIZE
The minimum pool size; used if not otherwise specified. Default value is 1.
Field Value:
1

blockedExecutionHandler_

protected PooledExecutor.BlockedExecutionHandler blockedExecutionHandler_
The current handler for unserviceable requests. *

handOff_

protected final Channel handOff_
The channel used to hand off the command to a thread in the pool.

keepAliveTime_

protected long keepAliveTime_
The maximum time for an idle thread to wait for new task. *

maximumPoolSize_

protected int maximumPoolSize_
The maximum number of threads allowed in pool. *

minimumPoolSize_

protected int minimumPoolSize_
The minumum number of threads to maintain in pool. *

poolSize_

protected int poolSize_
Current pool size. *

shutdown_

protected boolean shutdown_
Shutdown flag - latches true when a shutdown method is called in order to disable queuing/handoffs of new tasks.

threads_

protected final Map threads_
The set of active threads, declared as a map from workers to their threads. This is needed by the interruptAll method. It may also be useful in subclasses that need to perform other thread management chores.

Constructor Details

PooledExecutor

public PooledExecutor()
Create a new pool with all default settings

PooledExecutor

public PooledExecutor(Channel channel)
Create a new pool that uses the supplied Channel for queuing, and with all default parameter settings.

PooledExecutor

public PooledExecutor(Channel channel,
                      int maxPoolSize)
Create a new pool that uses the supplied Channel for queuing, and with all default parameter settings except for maximum pool size.

PooledExecutor

public PooledExecutor(int maxPoolSize)
Create a new pool with all default settings except for maximum pool size.

Method Details

abortWhenBlocked

public void abortWhenBlocked()
Set the policy for blocked execution to be to throw a RuntimeException.

addThread

protected void addThread(Runnable command)
Create and start a thread to handle a new command. Call only when holding lock.

awaitTerminationAfterShutdown

public void awaitTerminationAfterShutdown()
            throws InterruptedException
Wait for a shutdown pool to fully terminate. This method may only be called after invoking shutdownNow or shutdownAfterProcessingCurrentlyQueuedTasks.

awaitTerminationAfterShutdown

public boolean awaitTerminationAfterShutdown(long maxWaitTime)
            throws InterruptedException
Wait for a shutdown pool to fully terminate, or until the timeout has expired. This method may only be called after invoking shutdownNow or shutdownAfterProcessingCurrentlyQueuedTasks.
Parameters:
maxWaitTime - the maximum time in milliseconds to wait
Returns:
true if the pool has terminated within the max wait period

createThreads

public int createThreads(int numberOfThreads)
Create and start up to numberOfThreads threads in the pool. Return the number created. This may be less than the number requested if creating more would exceed maximum pool size bound.

discardOldestWhenBlocked

public void discardOldestWhenBlocked()
Set the policy for blocked execution to be to discard the oldest unhandled request

discardWhenBlocked

public void discardWhenBlocked()
Set the policy for blocked execution to be to return without executing the request.

drain

public List drain()
Remove all unprocessed tasks from pool queue, and return them in a java.util.List. Thsi method should be used only when there are not any active clients of the pool. Otherwise you face the possibility that the method will loop pulling out tasks as clients are putting them in. This method can be useful after shutting down a pool (via shutdownNow) to determine whether there are any pending tasks that were not processed. You can then, for example execute all unprocessed commands via code along the lines of:
   List tasks = pool.drain();
   for (Iterator it = tasks.iterator(); it.hasNext();) 
     ( (Runnable)(it.next()) ).run();
 

execute

public void execute(Runnable command)
            throws InterruptedException
Arrange for the given command to be executed by a thread in this pool. The method normally returns when the command has been handed off for (possibly later) execution.
Specified by:
execute in interface Executor

getBlockedExecutionHandler

public PooledExecutor.BlockedExecutionHandler getBlockedExecutionHandler()
Get the handler for blocked execution *

getKeepAliveTime

public long getKeepAliveTime()
Return the number of milliseconds to keep threads alive waiting for new commands. A negative value means to wait forever. A zero value means not to wait at all.

getMaximumPoolSize

public int getMaximumPoolSize()
Return the maximum number of threads to simultaneously execute New unqueued requests will be handled according to the current blocking policy once this limit is exceeded.

getMinimumPoolSize

public int getMinimumPoolSize()
Return the minimum number of threads to simultaneously execute. (Default value is 1). If fewer than the mininum number are running upon reception of a new request, a new thread is started to handle this request.

getPoolSize

public int getPoolSize()
Return the current number of active threads in the pool. This number is just a snaphot, and may change immediately upon returning

getTask

protected Runnable getTask()
            throws InterruptedException
Get a task from the handoff queue, or null if shutting down.

interruptAll

public void interruptAll()
Interrupt all threads in the pool, causing them all to terminate. Assuming that executed tasks do not disable (clear) interruptions, each thread will terminate after processing its current task. Threads will terminate sooner if the executed tasks themselves respond to interrupts.

isTerminatedAfterShutdown

public boolean isTerminatedAfterShutdown()
Return true if a shutDown method has succeeded in terminating all threads.

runWhenBlocked

public void runWhenBlocked()
Set the policy for blocked execution to be that the current thread executes the command if there are no available threads in the pool.

setBlockedExecutionHandler

public void setBlockedExecutionHandler(PooledExecutor.BlockedExecutionHandler h)
Set the handler for blocked execution *

setKeepAliveTime

public void setKeepAliveTime(long msecs)
Set the number of milliseconds to keep threads alive waiting for new commands. A negative value means to wait forever. A zero value means not to wait at all.

setMaximumPoolSize

public void setMaximumPoolSize(int newMaximum)
Set the maximum number of threads to use. Decreasing the pool size will not immediately kill existing threads, but they may later die when idle.

setMinimumPoolSize

public void setMinimumPoolSize(int newMinimum)
Set the minimum number of threads to use.

shutdownAfterProcessingCurrentlyQueuedTasks

public void shutdownAfterProcessingCurrentlyQueuedTasks()
Terminate threads after processing all elements currently in queue. Any tasks entered after this point will be discarded. A shut down pool cannot be restarted.

shutdownAfterProcessingCurrentlyQueuedTasks

public void shutdownAfterProcessingCurrentlyQueuedTasks(PooledExecutor.BlockedExecutionHandler handler)
Terminate threads after processing all elements currently in queue. Any tasks entered after this point will be handled by the given BlockedExecutionHandler. A shut down pool cannot be restarted.

shutdownNow

public void shutdownNow()
Interrupt all threads and disable construction of new threads. Any tasks entered after this point will be discarded. A shut down pool cannot be restarted.

shutdownNow

public void shutdownNow(PooledExecutor.BlockedExecutionHandler handler)
Interrupt all threads and disable construction of new threads. Any tasks entered after this point will be handled by the given BlockedExecutionHandler. A shut down pool cannot be restarted.

waitWhenBlocked

public void waitWhenBlocked()
Set the policy for blocked execution to be to wait until a thread is available, unless the pool has been shut down, in which case the action is discarded.

workerDone

protected void workerDone(PooledExecutor.Worker w)
Cleanup method called upon termination of worker thread.