cctools
|
A master-worker library. More...
#include "timestamp.h"
Go to the source code of this file.
Data Structures | |
struct | work_queue_task |
A task description. More... | |
struct | work_queue_stats |
Statistics describing a work queue. More... |
Macros | |
#define | WORK_QUEUE_DEFAULT_PORT 9123 |
Default Work Queue port number. | |
#define | WORK_QUEUE_RANDOM_PORT 0 |
Indicates that any port number may be chosen. | |
#define | WORK_QUEUE_WAITFORTASK -1 |
Wait for a task to complete before returning. | |
#define | WORK_QUEUE_SCHEDULE_FCFS 1 |
Select worker on a first-come-first-serve basis. | |
#define | WORK_QUEUE_SCHEDULE_FILES 2 |
Select worker that has the most data required by the task. | |
#define | WORK_QUEUE_SCHEDULE_TIME 3 |
Select worker that has the fastest execution time on previous tasks. | |
#define | WORK_QUEUE_SCHEDULE_RAND 4 |
Select a random worker. | |
#define | WORK_QUEUE_TASK_ORDER_FIFO 0 |
Retrieve tasks based on first-in-first-out order. | |
#define | WORK_QUEUE_TASK_ORDER_LIFO 1 |
Retrieve tasks based on last-in-first-out order. | |
#define | WORK_QUEUE_INPUT 0 |
Specify an input object. | |
#define | WORK_QUEUE_OUTPUT 1 |
Specify an output object. | |
#define | WORK_QUEUE_NOCACHE 0 |
Do not cache file at execution site. | |
#define | WORK_QUEUE_CACHE 1 |
Cache file at execution site for later use. | |
#define | WORK_QUEUE_MASTER_MODE_STANDALONE 0 |
Work Queue master does not report to the catalog server. | |
#define | WORK_QUEUE_MASTER_MODE_CATALOG 1 |
Work Queue master reports to catalog server. |
Functions | |
Functions - Tasks | |
struct work_queue_task * | work_queue_task_create (const char *full_command) |
Create a new task object. | |
void | work_queue_task_specify_file (struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags) |
Add a file to a task. | |
void | work_queue_task_specify_buffer (struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags) |
Add an input buffer to a task. | |
void | work_queue_task_specify_file_command (struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags) |
void | work_queue_task_specify_tag (struct work_queue_task *t, const char *tag) |
Attach a user defined string tag to the task. | |
void | work_queue_task_specify_algorithm (struct work_queue_task *t, int algo) |
Select the scheduling algorithm for a single task. | |
void | work_queue_task_delete (struct work_queue_task *t) |
Delete a task. | |
Functions - Queues | |
struct work_queue * | work_queue_create (int port) |
Create a new work queue. | |
int | work_queue_submit (struct work_queue *q, struct work_queue_task *t) |
Submit a task to a queue. | |
struct work_queue_task * | work_queue_wait (struct work_queue *q, int timeout) |
Wait for a task to complete. | |
int | work_queue_hungry (struct work_queue *q) |
Determine whether the queue is 'hungry' for more tasks. | |
int | work_queue_empty (struct work_queue *q) |
Determine whether the queue is empty. | |
int | work_queue_port (struct work_queue *q) |
Get the listening port of the queue. | |
const char * | work_queue_name (struct work_queue *q) |
Get the project name of the queue. | |
void | work_queue_get_stats (struct work_queue *q, struct work_queue_stats *s) |
Get queue statistics. | |
char * | work_queue_get_worker_summary (struct work_queue *q) |
Summarize workers. | |
int | work_queue_activate_fast_abort (struct work_queue *q, double multiplier) |
Turn on or off fast abort functionality for a given queue. | |
void | work_queue_specify_algorithm (struct work_queue *q, int algo) |
Change the worker selection algorithm. | |
void | work_queue_specify_task_order (struct work_queue *q, int order) |
Specify how the submitted tasks should be ordered. | |
void | work_queue_specify_name (struct work_queue *q, const char *name) |
Change the project name for a given queue. | |
void | work_queue_specify_priority (struct work_queue *q, int priority) |
Change the priority for a given queue. | |
void | work_queue_specify_master_mode (struct work_queue *q, int mode) |
Specify the master mode for a given queue. | |
struct work_queue_task * | work_queue_cancel_by_taskid (struct work_queue *q, int id) |
Cancel a submitted task using its task id and remove it from queue. | |
struct work_queue_task * | work_queue_cancel_by_tasktag (struct work_queue *q, const char *tag) |
Cancel a submitted task using its tag and remove it from queue. | |
int | work_queue_shut_down_workers (struct work_queue *q, int n) |
Shut down workers connected to the work_queue system. | |
void | work_queue_delete (struct work_queue *q) |
Delete a work queue. | |
Functions - Deprecated | |
void | work_queue_task_specify_input_buf (struct work_queue_task *t, const char *buf, int length, const char *rname) |
Add an input buffer to a task. | |
void | work_queue_task_specify_input_file (struct work_queue_task *t, const char *fname, const char *rname) |
Add an input file to a task. | |
void | work_queue_task_specify_input_file_do_not_cache (struct work_queue_task *t, const char *fname, const char *rname) |
Add an input file to a task, without caching. | |
void | work_queue_task_specify_output_file (struct work_queue_task *t, const char *rname, const char *fname) |
Add an output file to a task. | |
void | work_queue_task_specify_output_file_do_not_cache (struct work_queue_task *t, const char *rname, const char *fname) |
Add an output file to a task without caching. |
Variables | |
double | wq_option_fast_abort_multiplier |
Initial setting for fast abort multiplier upon creating queue. | |
int | wq_option_scheduler |
Initial setting for algorithm to assign tasks to workers upon creating queue . |
A master-worker library.
The work queue provides an implementation of the master-worker computing model using TCP sockets, Unix applications, and files as intermediate buffers. A master process uses work_queue_create to create a queue, then work_queue_submit to submit tasks. Once tasks are running, call work_queue_wait to wait for completion. The generic worker
program can be run on any machine, and simply needs to be told the host and port of the master.
#define WORK_QUEUE_DEFAULT_PORT 9123 |
Default Work Queue port number.
#define WORK_QUEUE_RANDOM_PORT 0 |
Indicates that any port number may be chosen.
#define WORK_QUEUE_WAITFORTASK -1 |
Wait for a task to complete before returning.
#define WORK_QUEUE_SCHEDULE_FCFS 1 |
Select worker on a first-come-first-serve basis.
#define WORK_QUEUE_SCHEDULE_FILES 2 |
Select worker that has the most data required by the task.
#define WORK_QUEUE_SCHEDULE_TIME 3 |
Select worker that has the fastest execution time on previous tasks.
#define WORK_QUEUE_SCHEDULE_RAND 4 |
Select a random worker.
#define WORK_QUEUE_TASK_ORDER_FIFO 0 |
Retrieve tasks based on first-in-first-out order.
#define WORK_QUEUE_TASK_ORDER_LIFO 1 |
Retrieve tasks based on last-in-first-out order.
#define WORK_QUEUE_INPUT 0 |
Specify an input object.
#define WORK_QUEUE_OUTPUT 1 |
Specify an output object.
#define WORK_QUEUE_NOCACHE 0 |
Do not cache file at execution site.
#define WORK_QUEUE_CACHE 1 |
Cache file at execution site for later use.
#define WORK_QUEUE_MASTER_MODE_STANDALONE 0 |
Work Queue master does not report to the catalog server.
#define WORK_QUEUE_MASTER_MODE_CATALOG 1 |
Work Queue master reports to catalog server.
|
read |
Create a new task object.
Once created and elaborated with functions such as work_queue_task_specify_file and work_queue_task_specify_buffer, the task should be passed to work_queue_submit.
full_command | The shell command line to be executed by the task. |
Referenced by work_queue.Task::__init__().
void work_queue_task_specify_file | ( | struct work_queue_task * | t, |
const char * | local_name, | ||
const char * | remote_name, | ||
int | type, | ||
int | flags | ||
) |
Add a file to a task.
t | A task object. |
local_name | The name of the file on local disk or shared filesystem. |
remote_name | The name of the file at the remote execution site. |
type | Must be one of the following values:
|
flags | May be zero to indicate no special handling or any of the following or'd together:
|
Referenced by work_queue.Task::specify_file().
void work_queue_task_specify_buffer | ( | struct work_queue_task * | t, |
const char * | data, | ||
int | length, | ||
const char * | remote_name, | ||
int | flags | ||
) |
Add an input buffer to a task.
t | A task object. |
data | The data to be passed as an input file. |
length | The length of the buffer, in bytes |
remote_name | The name of the remote file to create. |
flags | May be zero to indicate no special handling or any of the following or'd together:
|
Referenced by work_queue.Task::specify_buffer().
void work_queue_task_specify_tag | ( | struct work_queue_task * | t, |
const char * | tag | ||
) |
Attach a user defined string tag to the task.
This field is not interpreted by the work queue, but is provided for the user's convenience in identifying tasks when they complete.
t | A task object. |
tag | The tag to attach to task t. |
Referenced by work_queue.Task::specify_tag().
void work_queue_task_specify_algorithm | ( | struct work_queue_task * | t, |
int | algo | ||
) |
Select the scheduling algorithm for a single task.
To change the scheduling algorithm for all tasks, use work_queue_specify_algorithm instead.
t | A task object. |
algo | The algorithm to use in assigning this task to a worker:
|
Referenced by work_queue.Task::specify_algorithm().
void work_queue_task_delete | ( | struct work_queue_task * | t | ) |
Delete a task.
This may be called on tasks after they are returned from work_queue_wait.
t | The task to delete. |
Referenced by work_queue.Task::__init__().
|
read |
Create a new work queue.
Users may modify the behavior of work_queue_create by setting the following environmental variables before calling the function:
If the queue has a project name, then queue statistics and information will be reported to a catalog server. To specify the catalog server, the user may set the CATALOG_HOST and CATALOG_PORT environmental variables as described in catalog_query_create.
port | The port number to listen on. If zero is specified, then the default is chosen, and if -1 is specified, a random port is chosen. |
Referenced by work_queue.WorkQueue::__init__().
int work_queue_submit | ( | struct work_queue * | q, |
struct work_queue_task * | t | ||
) |
Submit a task to a queue.
Once a task is submitted to a queue, it is not longer under the user's control and should not be inspected until returned via work_queue_wait. Once returned, it is safe to re-submit the same take object via work_queue_submit.
q | A work queue object. |
t | A task object returned from work_queue_task_create. |
Referenced by work_queue.WorkQueue::submit().
|
read |
Wait for a task to complete.
This call will block until either a task has completed, the timeout has expired, or the queue is empty. If a task has completed, the corresponding task object will be returned by this function. The caller may examine the task and then dispose of it using work_queue_task_delete.
If the task ran to completion, then the result
field will be zero and the return_status
field will contain the Unix exit code of the task. If the task could not, then the result
field will be non-zero and the return_status
field will be undefined.
q | A work queue object. |
timeout | The number of seconds to wait for a completed task before returning. Use an integer time to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed. |
Referenced by work_queue.WorkQueue::wait().
int work_queue_hungry | ( | struct work_queue * | q | ) |
Determine whether the queue is 'hungry' for more tasks.
While the Work Queue can handle a very large number of tasks, it runs most efficiently when the number of tasks is slightly larger than the number of active workers. This function gives the user of a flexible application a hint about whether it would be better to submit more tasks via work_queue_submit or wait for some to complete via work_queue_wait.
q | A work queue object. |
Referenced by work_queue.WorkQueue::hungry().
int work_queue_empty | ( | struct work_queue * | q | ) |
Determine whether the queue is empty.
When all of the desired tasks have been submitted to the queue, the user should continue to call work_queue_wait until this function returns true.
q | A work queue object. |
Referenced by work_queue.WorkQueue::empty().
int work_queue_port | ( | struct work_queue * | q | ) |
Get the listening port of the queue.
As noted in work_queue_create, there are many controls that affect what TCP port the queue will listen on. Rather than assuming a specific port, the user should simply call this function to determine what port was selected.
q | A work queue object. |
Referenced by work_queue.WorkQueue::__init__().
const char* work_queue_name | ( | struct work_queue * | q | ) |
Get the project name of the queue.
q | A work queue object. |
Referenced by work_queue.WorkQueue::__init__().
void work_queue_get_stats | ( | struct work_queue * | q, |
struct work_queue_stats * | s | ||
) |
Get queue statistics.
q | A work queue object. |
s | A pointer to a buffer that will be filed with statistics. |
Referenced by work_queue.WorkQueue::__init__().
char* work_queue_get_worker_summary | ( | struct work_queue * | q | ) |
Summarize workers.
This function summarizes the workers currently connected to the master, indicating how many from each worker pool are attached.
q | A work queue object. |
int work_queue_activate_fast_abort | ( | struct work_queue * | q, |
double | multiplier | ||
) |
Turn on or off fast abort functionality for a given queue.
q | A work queue object. |
multiplier | The multiplier of the average task time at which point to abort; if negative (and by default) fast_abort is deactivated. |
Referenced by work_queue.WorkQueue::activate_fast_abort().
void work_queue_specify_algorithm | ( | struct work_queue * | q, |
int | algo | ||
) |
Change the worker selection algorithm.
Note that this function controls which worker will be selected for a given task while work_queue_specify_task_order controls which task will be executed next.
q | A work queue object. |
algo | The algorithm to use in assigning a task to a worker:
|
Referenced by work_queue.WorkQueue::specify_algorithm().
void work_queue_specify_task_order | ( | struct work_queue * | q, |
int | order | ||
) |
Specify how the submitted tasks should be ordered.
Note that this function controls which task to execute next, while work_queue_specify_algorithm controls which worker it should be assigned to.
q | A work queue object. |
order | The ordering to use for dispatching submitted tasks: |
Referenced by work_queue.WorkQueue::specify_task_order().
void work_queue_specify_name | ( | struct work_queue * | q, |
const char * | name | ||
) |
Change the project name for a given queue.
q | A work queue object. |
name | The new project name. |
Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_name().
void work_queue_specify_priority | ( | struct work_queue * | q, |
int | priority | ||
) |
Change the priority for a given queue.
q | A work queue object. |
priority | The new priority of the queue. Higher priority masters will attract workers first. |
Referenced by work_queue.WorkQueue::specify_priority().
void work_queue_specify_master_mode | ( | struct work_queue * | q, |
int | mode | ||
) |
Specify the master mode for a given queue.
q | A work queue object. |
mode |
|
Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_master_mode().
|
read |
Cancel a submitted task using its task id and remove it from queue.
q | A work queue object. |
id | The taskid returned from work_queue_submit. |
Referenced by work_queue.WorkQueue::cancel_by_taskid().
|
read |
Cancel a submitted task using its tag and remove it from queue.
q | A work queue object. |
tag | The tag name assigned to task using work_queue_task_specify_tag. |
Referenced by work_queue.WorkQueue::cancel_by_tasktag().
int work_queue_shut_down_workers | ( | struct work_queue * | q, |
int | n | ||
) |
Shut down workers connected to the work_queue system.
Gives a best effort and then returns the number of workers given the shut down order.
q | A work queue object. |
n | The number to shut down. All workers if given "0". |
Referenced by work_queue.WorkQueue::shutdown_workers().
void work_queue_delete | ( | struct work_queue * | q | ) |
Delete a work queue.
This function should only be called after work_queue_empty returns true.
q | A work queue to delete. |
Referenced by work_queue.WorkQueue::__init__().
void work_queue_task_specify_input_buf | ( | struct work_queue_task * | t, |
const char * | buf, | ||
int | length, | ||
const char * | rname | ||
) |
Add an input buffer to a task.
t | The task to which to add parameters |
buf | A pointer to the data buffer to send to the worker to be available to the commands. |
length | The number of bytes of data in the buffer |
rname | The name of the file in which to store the buffer data on the worker |
void work_queue_task_specify_input_file | ( | struct work_queue_task * | t, |
const char * | fname, | ||
const char * | rname | ||
) |
Add an input file to a task.
t | The task to which to add parameters |
fname | The name of the data file to send to the worker to be available to the commands. |
rname | The name of the file in which to store the buffer data on the worker. |
void work_queue_task_specify_input_file_do_not_cache | ( | struct work_queue_task * | t, |
const char * | fname, | ||
const char * | rname | ||
) |
Add an input file to a task, without caching.
t | The task to which to add parameters |
fname | The name of the data file to send to the worker to be available to the commands. |
rname | The name of the file in which to store the buffer data on the worker. |
void work_queue_task_specify_output_file | ( | struct work_queue_task * | t, |
const char * | rname, | ||
const char * | fname | ||
) |
Add an output file to a task.
t | The task to which to add parameters |
rname | The name of a file created by the program when it runs. |
fname | The name of the file local target for copying rname back. |
void work_queue_task_specify_output_file_do_not_cache | ( | struct work_queue_task * | t, |
const char * | rname, | ||
const char * | fname | ||
) |
Add an output file to a task without caching.
t | The task to which to add parameters |
rname | The name of a file created by the program when it runs. |
fname | The name of the file local target for copying rname back. |
double wq_option_fast_abort_multiplier |
Initial setting for fast abort multiplier upon creating queue.
Turned off if less than 0. Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.
int wq_option_scheduler |
Initial setting for algorithm to assign tasks to workers upon creating queue .
Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.