Gearman Developer Documentation

examples/reverse_worker.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include <errno.h>
00015 #include <signal.h>
00016 #include <stdio.h>
00017 #include <stdlib.h>
00018 #include <unistd.h>
00019 
00020 #include <libgearman/gearman.h>
00021 
00022 typedef enum
00023 {
00024   REVERSE_WORKER_OPTIONS_NONE=   0,
00025   REVERSE_WORKER_OPTIONS_DATA=   (1 << 0),
00026   REVERSE_WORKER_OPTIONS_STATUS= (1 << 1),
00027   REVERSE_WORKER_OPTIONS_UNIQUE= (1 << 2)
00028 } reverse_worker_options_t;
00029 
00030 static void *reverse(gearman_job_st *job, void *context,
00031                      size_t *result_size, gearman_return_t *ret_ptr);
00032 
00033 static void usage(char *name);
00034 
00035 int main(int argc, char *argv[])
00036 {
00037   int c;
00038   uint32_t count= 0;
00039   char *host= NULL;
00040   in_port_t port= 0;
00041   reverse_worker_options_t options= REVERSE_WORKER_OPTIONS_NONE;
00042   int timeout= -1;
00043   gearman_return_t ret;
00044   gearman_worker_st worker;
00045 
00046   while ((c = getopt(argc, argv, "c:dh:p:st:u")) != -1)
00047   {
00048     switch(c)
00049     {
00050     case 'c':
00051       count= (uint32_t)atoi(optarg);
00052       break;
00053 
00054     case 'd':
00055       options|= REVERSE_WORKER_OPTIONS_DATA;
00056       break;
00057 
00058     case 'h':
00059       host= optarg;
00060       break;
00061 
00062     case 'p':
00063       port= (in_port_t)atoi(optarg);
00064       break;
00065 
00066     case 's':
00067       options|= REVERSE_WORKER_OPTIONS_STATUS;
00068       break;
00069 
00070     case 't':
00071       timeout= atoi(optarg);
00072       break;
00073 
00074     case 'u':
00075       options|= REVERSE_WORKER_OPTIONS_UNIQUE;
00076       break;
00077 
00078     default:
00079       usage(argv[0]);
00080       exit(1);
00081     }
00082   }
00083 
00084   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
00085   {
00086     fprintf(stderr, "signal:%d\n", errno);
00087     exit(1);
00088   }
00089 
00090   if (gearman_worker_create(&worker) == NULL)
00091   {
00092     fprintf(stderr, "Memory allocation failure on worker creation\n");
00093     exit(1);
00094   }
00095 
00096   if (options & REVERSE_WORKER_OPTIONS_UNIQUE)
00097     gearman_worker_add_options(&worker, GEARMAN_WORKER_GRAB_UNIQ);
00098 
00099   if (timeout >= 0)
00100     gearman_worker_set_timeout(&worker, timeout);
00101 
00102   ret= gearman_worker_add_server(&worker, host, port);
00103   if (ret != GEARMAN_SUCCESS)
00104   {
00105     fprintf(stderr, "%s\n", gearman_worker_error(&worker));
00106     exit(1);
00107   }
00108 
00109   ret= gearman_worker_add_function(&worker, "reverse", 0, reverse,
00110                                    &options);
00111   if (ret != GEARMAN_SUCCESS)
00112   {
00113     fprintf(stderr, "%s\n", gearman_worker_error(&worker));
00114     exit(1);
00115   }
00116 
00117   while (1)
00118   {
00119     ret= gearman_worker_work(&worker);
00120     if (ret != GEARMAN_SUCCESS)
00121     {
00122       fprintf(stderr, "%s\n", gearman_worker_error(&worker));
00123       break;
00124     }
00125 
00126     if (count > 0)
00127     {
00128       count--;
00129       if (count == 0)
00130         break;
00131     }
00132   }
00133 
00134   gearman_worker_free(&worker);
00135 
00136   return 0;
00137 }
00138 
00139 static void *reverse(gearman_job_st *job, void *context,
00140                      size_t *result_size, gearman_return_t *ret_ptr)
00141 {
00142   reverse_worker_options_t options= *((reverse_worker_options_t *)context);
00143   const uint8_t *workload;
00144   uint8_t *result;
00145   size_t x;
00146   size_t y;
00147 
00148   workload= gearman_job_workload(job);
00149   *result_size= gearman_job_workload_size(job);
00150 
00151   result= malloc(*result_size);
00152   if (result == NULL)
00153   {
00154     fprintf(stderr, "malloc:%d\n", errno);
00155     *ret_ptr= GEARMAN_WORK_FAIL;
00156     return NULL;
00157   }
00158 
00159   for (y= 0, x= *result_size; x; x--, y++)
00160   {
00161     result[y]= ((uint8_t *)workload)[x - 1];
00162 
00163     if (options & REVERSE_WORKER_OPTIONS_DATA)
00164     {
00165       *ret_ptr= gearman_job_send_data(job, &(result[y]), 1);
00166       if (*ret_ptr != GEARMAN_SUCCESS)
00167       {
00168         free(result);
00169         return NULL;
00170       }
00171     }
00172 
00173     if (options & REVERSE_WORKER_OPTIONS_STATUS)
00174     {
00175       *ret_ptr= gearman_job_send_status(job, (uint32_t)y,
00176                                         (uint32_t)*result_size);
00177       if (*ret_ptr != GEARMAN_SUCCESS)
00178       {
00179         free(result);
00180         return NULL;
00181       }
00182 
00183       sleep(1);
00184     }
00185   }
00186 
00187   printf("Job=%s%s%s Workload=%.*s Result=%.*s\n", gearman_job_handle(job),
00188          options & REVERSE_WORKER_OPTIONS_UNIQUE ? " Unique=" : "",
00189          options & REVERSE_WORKER_OPTIONS_UNIQUE ? gearman_job_unique(job) : "",
00190          (int)*result_size, workload, (int)*result_size, result);
00191 
00192   *ret_ptr= GEARMAN_SUCCESS;
00193 
00194   if (options & REVERSE_WORKER_OPTIONS_DATA)
00195   {
00196     *result_size= 0;
00197     return NULL;
00198   }
00199 
00200   return result;
00201 }
00202 
00203 static void usage(char *name)
00204 {
00205   printf("\nusage: %s [-h <host>] [-p <port>]\n", name);
00206   printf("\t-c <count>   - number of jobs to run before exiting\n");
00207   printf("\t-d           - send result back in data chunks\n");
00208   printf("\t-h <host>    - job server host\n");
00209   printf("\t-p <port>    - job server port\n");
00210   printf("\t-s           - send status updates and sleep while running job\n");
00211   printf("\t-t <timeout> - timeout in milliseconds\n");
00212   printf("\t-u           - when grabbing jobs, grab the uniqie id\n");
00213 }