cctools
work_queue.h
Go to the documentation of this file.
1 /*
2 Copyright (C) 2008- The University of Notre Dame
3 This software is distributed under the GNU General Public License.
4 See the file COPYING for details.
5 */
6 
7 #ifndef WORK_QUEUE_H
8 #define WORK_QUEUE_H
9 
20 #include "timestamp.h"
21 
22 #define WORK_QUEUE_DEFAULT_PORT 9123
23 #define WORK_QUEUE_RANDOM_PORT 0
24 #define WORK_QUEUE_WAITFORTASK -1
26 #define WORK_QUEUE_SCHEDULE_UNSET 0
27 #define WORK_QUEUE_SCHEDULE_FCFS 1
28 #define WORK_QUEUE_SCHEDULE_FILES 2
29 #define WORK_QUEUE_SCHEDULE_TIME 3
30 #define WORK_QUEUE_SCHEDULE_RAND 4
32 #define WORK_QUEUE_TASK_ORDER_FIFO 0
33 #define WORK_QUEUE_TASK_ORDER_LIFO 1
35 #define WORK_QUEUE_INPUT 0
36 #define WORK_QUEUE_OUTPUT 1
38 #define WORK_QUEUE_NOCACHE 0
39 #define WORK_QUEUE_CACHE 1
40 #define WORK_QUEUE_SYMLINK 2 /* Create a symlink to the file rather than copying it, if possible. */
41 #define WORK_QUEUE_PREEXIST 4 /* If the filename already exists on the host, use it in place. */
42 #define WORK_QUEUE_THIRDGET 8 /* Access the file on the client from a shared filesystem */
43 #define WORK_QUEUE_THIRDPUT 8 /* Access the file on the client from a shared filesystem (included for readability) */
44 
45 #define WORK_QUEUE_MASTER_MODE_STANDALONE 0
46 #define WORK_QUEUE_MASTER_MODE_CATALOG 1
48 extern double wq_option_fast_abort_multiplier;
49 extern int wq_option_scheduler;
53 struct work_queue_task {
54  char *tag;
55  char *command_line;
57  char *output;
58  struct list *input_files;
59  struct list *output_files;
60  int taskid;
62  int result;
63  char *host;
78 };
79 
83  int port;
84  int priority;
96  INT64_T total_bytes_sent;
101  double efficiency;
102  double idle_percentage;
103  int capacity;
104  int avg_capacity;
105  int total_workers_connected;
106 };
107 
111 
118 struct work_queue_task *work_queue_task_create(const char *full_command);
119 
131 void work_queue_task_specify_file(struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags);
132 
142 void work_queue_task_specify_buffer(struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags);
143 
144 /* Add a file created or handled by an arbitrary command to a task (eg: wget, ftp, chirp_get|put).
145 @param t A task object.
146 @param remote_name The name of the file at the execution site.
147 @param cmd The command to run on the remote node to retrieve or store the file.
148 @param type Must be one of the following values:
149 - @ref WORK_QUEUE_INPUT to indicate an input file to be consumed by the task
150 - @ref WORK_QUEUE_OUTPUT to indicate an output file to be produced by the task
151 @param flags May be zero to indicate no special handling or any of the following or'd together:
152 - @ref WORK_QUEUE_CACHE indicates that the file should be cached for later tasks. (recommended)
153 - @ref WORK_QUEUE_NOCACHE indicates that the file should not be cached for later tasks.
154 */
155 void work_queue_task_specify_file_command(struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags);
156 
163 void work_queue_task_specify_tag(struct work_queue_task *t, const char *tag);
164 
174 void work_queue_task_specify_algorithm(struct work_queue_task *t, int algo );
175 
181 
183 
187 
204 struct work_queue *work_queue_create(int port);
205 
214 int work_queue_submit(struct work_queue *q, struct work_queue_task *t);
215 
230 struct work_queue_task *work_queue_wait(struct work_queue *q, int timeout);
231 
243 int work_queue_hungry(struct work_queue *q);
244 
252 int work_queue_empty(struct work_queue *q);
253 
260 int work_queue_port(struct work_queue *q);
261 
266 const char *work_queue_name(struct work_queue *q);
267 
272 void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s);
273 
280 char * work_queue_get_worker_summary( struct work_queue *q );
281 
287 int work_queue_activate_fast_abort(struct work_queue *q, double multiplier);
288 
300 void work_queue_specify_algorithm(struct work_queue *q, int algo);
301 
311 void work_queue_specify_task_order(struct work_queue *q, int order);
312 
317 void work_queue_specify_name(struct work_queue *q, const char *name);
318 
323 void work_queue_specify_priority(struct work_queue *q, int priority);
324 
331 void work_queue_specify_master_mode(struct work_queue *q, int mode);
332 
338 struct work_queue_task *work_queue_cancel_by_taskid(struct work_queue *q, int id);
339 
345 struct work_queue_task *work_queue_cancel_by_tasktag(struct work_queue *q, const char *tag);
346 
351 int work_queue_shut_down_workers(struct work_queue *q, int n);
352 
357 void work_queue_delete(struct work_queue *q);
358 
360 
364 
372 void work_queue_task_specify_input_buf(struct work_queue_task *t, const char *buf, int length, const char *rname);
373 
380 void work_queue_task_specify_input_file(struct work_queue_task *t, const char *fname, const char *rname);
381 
388 void work_queue_task_specify_input_file_do_not_cache(struct work_queue_task *t, const char *fname, const char *rname);
389 
396 void work_queue_task_specify_output_file(struct work_queue_task *t, const char *rname, const char *fname);
397 
404 void work_queue_task_specify_output_file_do_not_cache(struct work_queue_task *t, const char *rname, const char *fname);
405 
407 
408 
409 #endif