Gearman Developer Documentation

libgearman-server/queue_libmemcached.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 #include <libgearman-server/queue_libmemcached.h>
00017 #include <libmemcached/memcached.h>
00018 
00028 #define GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX "gear_"
00029 
00033 typedef struct
00034 {
00035   memcached_st memc;
00036 } gearman_queue_libmemcached_st;
00037 
00038 /* Queue callback functions. */
00039 static gearman_return_t _libmemcached_add(gearman_server_st *server,
00040                                           void *context, const void *unique,
00041                                           size_t unique_size,
00042                                           const void *function_name,
00043                                           size_t function_name_size,
00044                                           const void *data, size_t data_size,
00045                                           gearman_job_priority_t priority);
00046 static gearman_return_t _libmemcached_flush(gearman_server_st *server,
00047                                             void *context);
00048 static gearman_return_t _libmemcached_done(gearman_server_st *server,
00049                                            void *context, const void *unique,
00050                                            size_t unique_size,
00051                                            const void *function_name,
00052                                            size_t function_name_size);
00053 static gearman_return_t _libmemcached_replay(gearman_server_st *server,
00054                                              void *context,
00055                                              gearman_queue_add_fn *add_fn,
00056                                              void *add_context);
00057 
00060 /*
00061  * Public definitions
00062  */
00063 
00064 gearman_return_t gearman_server_queue_libmemcached_conf(gearman_conf_st *conf)
00065 {
00066   gearman_conf_module_st *module;
00067 
00068   module= gearman_conf_module_create(conf, NULL, "libmemcached");
00069   if (module == NULL)
00070     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00071 
00072   gearman_conf_module_add_option(module, "servers", 0, "SERVER_LIST",
00073                                  "List of Memcached servers to use.");
00074   return gearman_conf_return(conf);
00075 }
00076 
00077 gearman_return_t
00078 gearman_server_queue_libmemcached_init(gearman_server_st *server,
00079                                        gearman_conf_st *conf)
00080 {
00081   gearman_queue_libmemcached_st *queue;
00082   gearman_conf_module_st *module;
00083   const char *name;
00084   const char *value;
00085   memcached_server_st *servers;
00086   const char *opt_servers= NULL;
00087 
00088   gearman_log_info(server->gearman, "Initializing libmemcached module");
00089 
00090   queue= calloc(1, sizeof(gearman_queue_libmemcached_st));
00091   if (queue == NULL)
00092   {
00093     gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "malloc");
00094 
00095     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00096   }
00097 
00098   if (memcached_create(&(queue->memc)) == NULL)
00099   {
00100     free(queue);
00101     gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "memcached_create");
00102     return GEARMAN_QUEUE_ERROR;
00103   }
00104 
00105   /* Get module and parse the option values that were given. */
00106   module= gearman_conf_module_find(conf, "libmemcached");
00107   if (module == NULL)
00108   {
00109     gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "gearman_conf_module_find:NULL");
00110 
00111     return GEARMAN_QUEUE_ERROR;
00112   }
00113 
00114   while (gearman_conf_module_value(module, &name, &value))
00115   {
00116     if (!strcmp(name, "servers"))
00117       opt_servers= value;
00118     else
00119     {
00120       memcached_free(&(queue->memc));
00121       free(queue);
00122       gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "Unknown argument: %s", name);
00123       return GEARMAN_QUEUE_ERROR;
00124     }
00125   }
00126 
00127   if (opt_servers == NULL)
00128   {
00129     gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "No --servers given");
00130     return GEARMAN_QUEUE_ERROR;
00131   }
00132 
00133   servers= memcached_servers_parse(opt_servers);
00134 
00135   if (servers == NULL)
00136   {
00137     memcached_free(&(queue->memc));
00138     free(queue);
00139 
00140     gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "memcached_servers_parse");
00141 
00142     return GEARMAN_QUEUE_ERROR;
00143   }
00144 
00145   memcached_server_push(&queue->memc, servers);
00146   memcached_server_list_free(servers);
00147 
00148   gearman_server_set_queue_context(server, queue);
00149 
00150   gearman_server_set_queue_add_fn(server, _libmemcached_add);
00151   gearman_server_set_queue_flush_fn(server, _libmemcached_flush);
00152   gearman_server_set_queue_done_fn(server, _libmemcached_done);
00153   gearman_server_set_queue_replay_fn(server, _libmemcached_replay);
00154 
00155   return GEARMAN_SUCCESS;
00156 }
00157 
00158 gearman_return_t
00159 gearman_server_queue_libmemcached_deinit(gearman_server_st *server)
00160 {
00161   gearman_queue_libmemcached_st *queue;
00162 
00163   gearman_log_info(server->gearman, "Shutting down libmemcached queue module");
00164 
00165   queue= (gearman_queue_libmemcached_st *)gearman_server_queue_context(server);
00166   gearman_server_set_queue_context(server, NULL);
00167   memcached_free(&(queue->memc));
00168 
00169   free(queue);
00170 
00171   return GEARMAN_SUCCESS;
00172 }
00173 
00174 gearman_return_t gearmand_queue_libmemcached_init(gearmand_st *gearmand,
00175                                                   gearman_conf_st *conf)
00176 {
00177   return gearman_server_queue_libmemcached_init(&(gearmand->server), conf);
00178 }
00179 
00180 gearman_return_t gearmand_queue_libmemcached_deinit(gearmand_st *gearmand)
00181 {
00182   return gearman_server_queue_libmemcached_deinit(&(gearmand->server));
00183 }
00184 
00185 /*
00186  * Static definitions
00187  */
00188 
00189 static gearman_return_t _libmemcached_add(gearman_server_st *server,
00190                                           void *context, const void *unique,
00191                                           size_t unique_size,
00192                                           const void *function_name,
00193                                           size_t function_name_size,
00194                                           const void *data, size_t data_size,
00195                                           gearman_job_priority_t priority)
00196 {
00197   gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00198   memcached_return rc;
00199   char key[MEMCACHED_MAX_KEY];
00200   size_t key_length;
00201 
00202   gearman_log_debug(server->gearman, "libmemcached add: %.*s", (uint32_t)unique_size, (char *)unique);
00203 
00204   key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
00205                                GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
00206                                (int)function_name_size,
00207                                (const char *)function_name, (int)unique_size,
00208                                (const char *)unique);
00209 
00210   rc= memcached_set(&queue->memc, (const char *)key, key_length,
00211                     (const char *)data, data_size, 0, (uint32_t)priority);
00212 
00213   if (rc != MEMCACHED_SUCCESS)
00214     return GEARMAN_QUEUE_ERROR;
00215 
00216   return GEARMAN_SUCCESS;
00217 }
00218 
00219 static gearman_return_t _libmemcached_flush(gearman_server_st *server,
00220                                            void *context __attribute__((unused)))
00221 {
00222   gearman_log_debug(server->gearman, "libmemcached flush");
00223 
00224   return GEARMAN_SUCCESS;
00225 }
00226 
00227 static gearman_return_t _libmemcached_done(gearman_server_st *server,
00228                                            void *context, const void *unique,
00229                                            size_t unique_size,
00230                                            const void *function_name,
00231                                            size_t function_name_size)
00232 {
00233   size_t key_length;
00234   char key[MEMCACHED_MAX_KEY];
00235   memcached_return rc;
00236   gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00237 
00238   gearman_log_debug(server->gearman, "libmemcached done: %.*s", (uint32_t)unique_size, (char *)unique);
00239 
00240   key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
00241                                GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
00242                                (int)function_name_size,
00243                                (const char *)function_name, (int)unique_size,
00244                                (const char *)unique);
00245 
00246   /* For the moment we will assume it happened */
00247   rc= memcached_delete(&queue->memc, (const char *)key, key_length, 0);
00248 
00249   if (rc != MEMCACHED_SUCCESS)
00250     return GEARMAN_QUEUE_ERROR;
00251 
00252   return GEARMAN_SUCCESS;
00253 }
00254 
00255 struct replay_context
00256 {
00257   memcached_st clone;
00258   gearman_server_st *server;
00259   gearman_queue_add_fn *add_fn;
00260   void *add_context;
00261 };
00262 
00263 static memcached_return callback_loader(const memcached_st *ptr __attribute__((unused)),
00264                                         memcached_result_st *result __attribute__((unused)),
00265                                         void *context)
00266 {
00267   struct replay_context *container= (struct replay_context *)context;
00268   const char *key;
00269   const char *unique;
00270   char *function;
00271   size_t unique_len;
00272 
00273   key= memcached_result_key_value(result);
00274 
00275   if (strcmp(key, GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX))
00276     return MEMCACHED_SUCCESS;
00277 
00278   unique= key + strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX);
00279 
00280   function= index(unique, '-');
00281   unique_len= (size_t)(function - unique);
00282   function++;
00283 
00284   assert(unique);
00285   assert(unique_len);
00286   assert(function);
00287   assert(strlen(function));
00288   /* Currently not looking at failure cases */
00289   (void)(*container->add_fn)(container->server, container->add_context,
00290                              unique, unique_len,
00291                              function, strlen(function),
00292                              memcached_result_value(result), memcached_result_length(result),
00293                              memcached_result_flags(result));
00294 
00295 
00296   return MEMCACHED_SUCCESS;
00297 }
00298 
00299 /* Grab the object and load it into the loader */
00300 static memcached_return callback_for_key(const memcached_st *ptr __attribute__((unused)),
00301                                          const char *key, size_t key_length,
00302                                          void *context)
00303 {
00304   memcached_return rc;
00305   struct replay_context *container= (struct replay_context *)context;
00306   memcached_execute_function callbacks[1];
00307   char *passable[1];
00308 
00309   callbacks[0]= (memcached_execute_fn)&callback_loader;
00310 
00311   passable[0]= (char *)key;
00312   rc= memcached_mget(&container->clone, (void *)passable, &key_length, 1);
00313 
00314   /* Just void errors for the moment, since other treads might have picked up the object. */
00315   (void)memcached_fetch_execute(&container->clone, callbacks, context, 1);
00316 
00317   return MEMCACHED_SUCCESS;
00318 }
00319 
00320 /*
00321   If we have any failures for loading values back into replay we just ignore them.
00322 */
00323 static gearman_return_t _libmemcached_replay(gearman_server_st *server, void *context,
00324                                              gearman_queue_add_fn *add_fn,
00325                                              void *add_context)
00326 {
00327   gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00328   struct replay_context container;
00329   memcached_st *check_for_failure;
00330   memcached_dump_func callbacks[1];
00331 
00332   callbacks[0]= (memcached_dump_fn)&callback_for_key;
00333 
00334   gearman_log_info(server->gearman, "libmemcached replay start");
00335 
00336   memset(&container, 0, sizeof(struct replay_context));
00337   check_for_failure= memcached_clone(&container.clone, &queue->memc);
00338   container.server= server;
00339   container.add_fn= add_fn;
00340   container.add_context= add_context;
00341 
00342   assert(check_for_failure);
00343 
00344 
00345   (void)memcached_dump(&queue->memc, callbacks, (void *)&container, 1);
00346 
00347   memcached_free(&container.clone);
00348 
00349   return GEARMAN_SUCCESS;
00350 }