Gearman Developer Documentation

libgearman-server/queue_libtokyocabinet.c
Go to the documentation of this file.
00001 
00006 #include "common.h"
00007 
00008 #include <libgearman-server/queue_libtokyocabinet.h>
00009 #include <tcutil.h>
00010 #include <tcadb.h>
00011 
00018 /*
00019  * Private declarations
00020  */
00021 
00025 typedef struct
00026 {
00027   TCADB *db;
00028 } gearman_queue_libtokyocabinet_st;
00029 
00030 /* Queue callback functions. */
00031 static gearman_return_t _libtokyocabinet_add(gearman_server_st *server, void *context,
00032                                              const void *unique,
00033                                              size_t unique_size,
00034                                              const void *function_name,
00035                                              size_t function_name_size,
00036                                              const void *data, size_t data_size,
00037                                              gearman_job_priority_t priority);
00038 static gearman_return_t _libtokyocabinet_flush(gearman_server_st *server, void *context);
00039 static gearman_return_t _libtokyocabinet_done(gearman_server_st *server, void *context,
00040                                               const void *unique,
00041                                               size_t unique_size, 
00042                                               const void *function_name, 
00043                                               size_t function_name_size);
00044 static gearman_return_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
00045                                                 gearman_queue_add_fn *add_fn,
00046                                                 void *add_context);
00047 
00051 static const char * _libtokyocabinet_tcaerrmsg(TCADB *db)
00052 {
00053   switch (tcadbomode(db))
00054   {
00055   case ADBOHDB:
00056     return tcerrmsg(tchdbecode((TCHDB *)tcadbreveal(db)));
00057   case ADBOBDB:
00058     return tcerrmsg(tcbdbecode((TCBDB *)tcadbreveal(db)));
00059   default:
00060     return tcerrmsg(TCEMISC);
00061   }
00062 }
00063 
00064 /*
00065  * Public definitions
00066  */
00067 
00068 gearman_return_t gearman_server_queue_libtokyocabinet_conf(gearman_conf_st *conf)
00069 {
00070   gearman_conf_module_st *module;
00071 
00072   module= gearman_conf_module_create(conf, NULL, "libtokyocabinet");
00073   if (module == NULL)
00074     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00075 
00076   gearman_conf_module_add_option(module, "file", 0, "FILE_NAME",
00077                                  "File name of the database. [see: man tcadb, tcadbopen() for name guidelines]");
00078   gearman_conf_module_add_option(module, "optimize", 0, "yes/no",
00079                                  "Optimize database on open. [default=yes]");
00080   return gearman_conf_return(conf);
00081 }
00082 
00083 gearman_return_t gearman_queue_libtokyocabinet_init(gearman_server_st *server,
00084                                                     gearman_conf_st *conf)
00085 {
00086   gearman_queue_libtokyocabinet_st *queue;
00087   gearman_conf_module_st *module;
00088   const char *name;
00089   const char *value;
00090   const char *opt_file= NULL;
00091   const char *opt_optimize= NULL;
00092 
00093   gearman_log_info(server->gearman, "Initializing libtokyocabinet module");
00094 
00095   queue= calloc(1, sizeof(gearman_queue_libtokyocabinet_st));
00096   if (queue == NULL)
00097   {
00098     gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init", "malloc");
00099     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00100   }
00101 
00102   if ((queue->db= tcadbnew()) == NULL)
00103   {
00104     free(queue);
00105     gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00106                              "tcadbnew");
00107     return GEARMAN_QUEUE_ERROR;
00108   }
00109    
00110   /* Get module and parse the option values that were given. */
00111   module= gearman_conf_module_find(conf, "libtokyocabinet");
00112   if (module == NULL)
00113   {
00114     gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00115                               "modconf_module_find:NULL");
00116     return GEARMAN_QUEUE_ERROR;
00117   }
00118 
00119   while (gearman_conf_module_value(module, &name, &value))
00120   { 
00121     if (!strcmp(name, "file"))
00122       opt_file= value;
00123     else if (!strcmp(name, "optimize"))
00124       opt_optimize= value;
00125     else
00126     {
00127       tcadbdel(queue->db);
00128       free(queue);
00129       gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00130                                "Unknown argument: %s", name);
00131       return GEARMAN_QUEUE_ERROR;
00132     }
00133   }
00134      
00135   if (opt_file == NULL)
00136   {
00137     gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00138                              "No --file given");
00139     return GEARMAN_QUEUE_ERROR;
00140   }
00141 
00142   if (!tcadbopen(queue->db, opt_file))
00143   {
00144     tcadbdel(queue->db);
00145     free(queue);
00146 
00147     gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00148                              "tcadbopen(%s): %s", opt_file, _libtokyocabinet_tcaerrmsg(queue->db));
00149 
00150     return GEARMAN_QUEUE_ERROR;
00151   }
00152 
00153   if (opt_optimize == NULL || !strcasecmp(opt_optimize, "yes"))
00154   {
00155     gearman_log_info(server->gearman, "libtokyocabinet optimizing database file");
00156     if (!tcadboptimize(queue->db, NULL))
00157     {
00158       tcadbdel(queue->db);
00159       free(queue);
00160       gearman_log_error(server->gearman, "gearman_queue_libtokyocabinet_init",
00161                                "tcadboptimize");
00162 
00163       return GEARMAN_QUEUE_ERROR;
00164     }
00165   }
00166 
00167   gearman_server_set_queue_context(server, queue);
00168 
00169   gearman_server_set_queue_add_fn(server, _libtokyocabinet_add);
00170   gearman_server_set_queue_flush_fn(server, _libtokyocabinet_flush);
00171   gearman_server_set_queue_done_fn(server, _libtokyocabinet_done);
00172   gearman_server_set_queue_replay_fn(server, _libtokyocabinet_replay);   
00173    
00174   return GEARMAN_SUCCESS;
00175 }
00176    
00177 gearman_return_t gearman_queue_libtokyocabinet_deinit(gearman_server_st *server)
00178 {
00179   gearman_queue_libtokyocabinet_st *queue;
00180 
00181   gearman_log_info(server->gearman, "Shutting down libtokyocabinet queue module");
00182 
00183   queue= (gearman_queue_libtokyocabinet_st *)gearman_server_queue_context(server);
00184   gearman_server_set_queue_context(server, NULL);
00185   tcadbdel(queue->db);
00186 
00187   free(queue);
00188 
00189   return GEARMAN_SUCCESS;
00190 }
00191 
00192 gearman_return_t gearmand_queue_libtokyocabinet_init(gearmand_st *gearmand,
00193                                                      gearman_conf_st *conf)
00194 {
00195   return gearman_queue_libtokyocabinet_init(&(gearmand->server), conf);
00196 }
00197 
00198 gearman_return_t gearmand_queue_libtokyocabinet_deinit(gearmand_st *gearmand)
00199 {
00200   return gearman_queue_libtokyocabinet_deinit(&(gearmand->server));
00201 }
00202 
00203 /*
00204  * Private definitions
00205  */
00206 
00207 static gearman_return_t _libtokyocabinet_add(gearman_server_st *server, void *context,
00208                                              const void *unique,
00209                                              size_t unique_size,
00210                                              const void *function_name,
00211                                              size_t function_name_size,
00212                                              const void *data, size_t data_size,
00213                                              gearman_job_priority_t priority)
00214 {
00215   gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00216   bool rc;
00217   TCXSTR *key;
00218   TCXSTR *job_data;
00219 
00220   gearman_log_debug(server->gearman, "libtokyocabinet add: %.*s", (uint32_t)unique_size, (char *)unique);
00221 
00222   char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
00223   size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
00224                                (int)function_name_size,
00225                                (const char *)function_name, (int)unique_size,
00226                                (const char *)unique);
00227 
00228   key= tcxstrnew();
00229   tcxstrcat(key, key_str, (int)key_length);
00230 
00231   gearman_log_debug(server->gearman, "libtokyocabinet key: %.*s", (int)key_length, key_str);
00232 
00233   job_data= tcxstrnew();
00234 
00235   tcxstrcat(job_data, (const char *)function_name, (int)function_name_size);
00236   tcxstrcat(job_data, "\0", 1);
00237   tcxstrcat(job_data, (const char *)unique, (int)unique_size);
00238   tcxstrcat(job_data, "\0", 1);
00239 
00240   switch (priority)
00241   {
00242    case GEARMAN_JOB_PRIORITY_HIGH:
00243    case GEARMAN_JOB_PRIORITY_MAX:     
00244      tcxstrcat2(job_data,"0");
00245      break;
00246    case GEARMAN_JOB_PRIORITY_LOW:
00247      tcxstrcat2(job_data,"2");
00248      break;
00249    case GEARMAN_JOB_PRIORITY_NORMAL:
00250    default:
00251      tcxstrcat2(job_data,"1");
00252   }
00253 
00254   tcxstrcat(job_data, (const char *)data, (int)data_size);
00255 
00256   rc= tcadbput(queue->db, tcxstrptr(key), tcxstrsize(key),
00257                tcxstrptr(job_data), tcxstrsize(job_data));
00258 
00259   tcxstrdel(key);
00260   tcxstrdel(job_data);
00261 
00262   if (!rc)
00263     return GEARMAN_QUEUE_ERROR;
00264 
00265   return GEARMAN_SUCCESS;
00266 }
00267 
00268 static gearman_return_t _libtokyocabinet_flush(gearman_server_st *server,
00269                                                void *context __attribute__((unused)))
00270 {
00271   gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00272    
00273   gearman_log_debug(server->gearman, "libtokyocabinet flush");
00274 
00275   if (!tcadbsync(queue->db))
00276      return GEARMAN_QUEUE_ERROR;
00277    
00278   return GEARMAN_SUCCESS;
00279 }
00280 
00281 static gearman_return_t _libtokyocabinet_done(gearman_server_st *server, void *context,
00282                                               const void *unique,
00283                                               size_t unique_size, 
00284                                               const void *function_name,
00285                                               size_t function_name_size)
00286 {
00287   gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00288   bool rc;
00289   TCXSTR *key;
00290 
00291   (void) function_name;
00292   (void) function_name_size;   
00293   gearman_log_debug(server->gearman, "libtokyocabinet add: %.*s", (uint32_t)unique_size, (char *)unique);
00294   
00295   char key_str[GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN];
00296   size_t key_length= (size_t)snprintf(key_str, GEARMAN_QUEUE_TOKYOCABINET_MAX_KEY_LEN, "%.*s-%.*s",
00297                                (int)function_name_size,
00298                                (const char *)function_name, (int)unique_size,
00299                                (const char *)unique);
00300 
00301   key= tcxstrnew();
00302   tcxstrcat(key, key_str, (int)key_length);
00303   rc= tcadbout(queue->db, tcxstrptr(key), tcxstrsize(key));
00304   tcxstrdel(key);
00305 
00306   if (!rc)
00307     return GEARMAN_QUEUE_ERROR;
00308 
00309   return GEARMAN_SUCCESS;
00310 }
00311 
00312 static gearman_return_t _callback_for_record(gearman_server_st *server,
00313                                              TCXSTR *key, TCXSTR *data,
00314                                              gearman_queue_add_fn *add_fn,
00315                                              void *add_context)
00316 {
00317   char *data_cstr;
00318   size_t data_cstr_size;
00319   const char *function;
00320   size_t function_len;
00321   char *unique;
00322   size_t unique_len;
00323   gearman_job_priority_t priority;
00324   gearman_return_t gret;
00325   
00326   gearman_log_debug(server->gearman, "replaying: %s", (char *) tcxstrptr(key));
00327 
00328   data_cstr= (char *)tcxstrptr(data);
00329   data_cstr_size= (size_t)tcxstrsize(data);
00330 
00331   function= data_cstr;
00332   function_len= strlen(function);
00333 
00334   unique= data_cstr+function_len+1;
00335   unique_len= strlen(unique); // strlen is only safe because tcxstrptr guarantees nul term
00336 
00337   // +2 for nulls
00338   data_cstr += unique_len+function_len+2;
00339   data_cstr_size -= unique_len+function_len+2;
00340 
00341   assert(unique);
00342   assert(unique_len);
00343   assert(function);
00344   assert(function_len);
00345 
00346   // single char for priority
00347   if (*data_cstr == '2')
00348     priority = GEARMAN_JOB_PRIORITY_LOW;
00349   else if (*data_cstr == '0')
00350     priority = GEARMAN_JOB_PRIORITY_HIGH;
00351   else
00352     priority = GEARMAN_JOB_PRIORITY_NORMAL;
00353 
00354   ++data_cstr;
00355   --data_cstr_size;
00356 
00357   // data is freed later so we must make a copy
00358   void *data_ptr= (void *)malloc(data_cstr_size);
00359   if (data_ptr == NULL)
00360   {
00361     return GEARMAN_QUEUE_ERROR;
00362   }
00363   memcpy(data_ptr, data_cstr, data_cstr_size); 
00364 
00365   gret = (*add_fn)(server, add_context, unique, unique_len,
00366                    function, function_len,
00367                    data_ptr, data_cstr_size,
00368                    priority);
00369 
00370   if (gret != GEARMAN_SUCCESS)
00371   {
00372      return gret;
00373   }   
00374   return GEARMAN_SUCCESS;
00375 }
00376 
00377 
00378 static gearman_return_t _libtokyocabinet_replay(gearman_server_st *server, void *context,
00379                                                 gearman_queue_add_fn *add_fn,
00380                                                 void *add_context)
00381 {
00382   gearman_queue_libtokyocabinet_st *queue= (gearman_queue_libtokyocabinet_st *)context;
00383   TCXSTR *key;
00384   TCXSTR *data;
00385   void *iter= NULL;
00386   int iter_size= 0;
00387   gearman_return_t gret;
00388   gearman_return_t tmp_gret;   
00389    
00390   gearman_log_info(server->gearman, "libtokyocabinet replay start");
00391   
00392   if (!tcadbiterinit(queue->db))
00393   {
00394     return GEARMAN_QUEUE_ERROR;
00395   }
00396   key= tcxstrnew();
00397   data= tcxstrnew();
00398   gret= GEARMAN_SUCCESS;
00399   uint64_t x= 0;
00400   while ((iter= tcadbiternext(queue->db, &iter_size)))
00401   {     
00402     tcxstrclear(key);
00403     tcxstrclear(data);
00404     tcxstrcat(key, iter, iter_size);
00405     free(iter);
00406     iter= tcadbget(queue->db, tcxstrptr(key), tcxstrsize(key), &iter_size);
00407     if (! iter) {
00408       gearman_log_info(server->gearman, "libtokyocabinet replay key disappeared: %s", (char *)tcxstrptr(key));
00409       continue;
00410     }
00411     tcxstrcat(data, iter, iter_size);
00412     free(iter);
00413     tmp_gret= _callback_for_record(server, key, data, add_fn, add_context);
00414     if (tmp_gret != GEARMAN_SUCCESS)
00415     {
00416       gret= GEARMAN_QUEUE_ERROR;
00417       break;
00418     }
00419     ++x;
00420   }
00421   tcxstrdel(key);
00422   tcxstrdel(data);
00423 
00424   gearman_log_info(server->gearman, "libtokyocabinet replayed %ld records", x);
00425 
00426   return gret;
00427 }