00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libmemcached.h>
00017 #include <libmemcached/memcached.h>
00018
00028 #define GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX "gear_"
00029
00030
00031
00032
00033
00037 typedef struct
00038 {
00039 memcached_st memc;
00040 } gearman_queue_libmemcached_st;
00041
00042
00043 static gearman_return_t _libmemcached_add(gearman_server_st *server,
00044 void *context, const void *unique,
00045 size_t unique_size,
00046 const void *function_name,
00047 size_t function_name_size,
00048 const void *data, size_t data_size,
00049 gearman_job_priority_t priority);
00050 static gearman_return_t _libmemcached_flush(gearman_server_st *server,
00051 void *context);
00052 static gearman_return_t _libmemcached_done(gearman_server_st *server,
00053 void *context, const void *unique,
00054 size_t unique_size,
00055 const void *function_name,
00056 size_t function_name_size);
00057 static gearman_return_t _libmemcached_replay(gearman_server_st *server,
00058 void *context,
00059 gearman_queue_add_fn *add_fn,
00060 void *add_context);
00061
00064
00065
00066
00067
00068 gearman_return_t gearman_server_queue_libmemcached_conf(gearman_conf_st *conf)
00069 {
00070 gearman_conf_module_st *module;
00071
00072 module= gearman_conf_module_create(conf, NULL, "libmemcached");
00073 if (module == NULL)
00074 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00075
00076 gearman_conf_module_add_option(module, "servers", 0, "SERVER_LIST",
00077 "List of Memcached servers to use.");
00078 return gearman_conf_return(conf);
00079 }
00080
00081 gearman_return_t
00082 gearman_server_queue_libmemcached_init(gearman_server_st *server,
00083 gearman_conf_st *conf)
00084 {
00085 gearman_queue_libmemcached_st *queue;
00086 gearman_conf_module_st *module;
00087 const char *name;
00088 const char *value;
00089 memcached_server_st *servers;
00090 const char *opt_servers= NULL;
00091
00092 GEARMAN_SERVER_INFO(server, "Initializing libmemcached module")
00093
00094 queue= calloc(1, sizeof(gearman_queue_libmemcached_st));
00095 if (queue == NULL)
00096 {
00097 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libmemcached_init",
00098 "malloc")
00099 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00100 }
00101
00102 if (memcached_create(&(queue->memc)) == NULL)
00103 {
00104 free(queue);
00105 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libmemcached_init",
00106 "memcached_create")
00107 return GEARMAN_QUEUE_ERROR;
00108 }
00109
00110
00111 module= gearman_conf_module_find(conf, "libmemcached");
00112 if (module == NULL)
00113 {
00114 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libmemcached_init",
00115 "gearman_conf_module_find:NULL")
00116 return GEARMAN_QUEUE_ERROR;
00117 }
00118
00119 while (gearman_conf_module_value(module, &name, &value))
00120 {
00121 if (!strcmp(name, "servers"))
00122 opt_servers= value;
00123 else
00124 {
00125 memcached_free(&(queue->memc));
00126 free(queue);
00127 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libmemcached_init",
00128 "Unknown argument: %s", name)
00129 return GEARMAN_QUEUE_ERROR;
00130 }
00131 }
00132
00133 if (opt_servers == NULL)
00134 {
00135 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libmemcached_init",
00136 "No --servers given")
00137 return GEARMAN_QUEUE_ERROR;
00138 }
00139
00140 servers= memcached_servers_parse(opt_servers);
00141
00142 if (servers == NULL)
00143 {
00144 memcached_free(&(queue->memc));
00145 free(queue);
00146
00147 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libmemcached_init",
00148 "memcached_servers_parse")
00149
00150 return GEARMAN_QUEUE_ERROR;
00151 }
00152
00153 memcached_server_push(&queue->memc, servers);
00154 memcached_server_list_free(servers);
00155
00156 gearman_server_set_queue_context(server, queue);
00157
00158 gearman_server_set_queue_add_fn(server, _libmemcached_add);
00159 gearman_server_set_queue_flush_fn(server, _libmemcached_flush);
00160 gearman_server_set_queue_done_fn(server, _libmemcached_done);
00161 gearman_server_set_queue_replay_fn(server, _libmemcached_replay);
00162
00163 return GEARMAN_SUCCESS;
00164 }
00165
00166 gearman_return_t
00167 gearman_server_queue_libmemcached_deinit(gearman_server_st *server)
00168 {
00169 gearman_queue_libmemcached_st *queue;
00170
00171 GEARMAN_SERVER_INFO(server, "Shutting down libmemcached queue module");
00172
00173 queue= (gearman_queue_libmemcached_st *)gearman_server_queue_context(server);
00174 gearman_server_set_queue_context(server, NULL);
00175 memcached_free(&(queue->memc));
00176
00177 free(queue);
00178
00179 return GEARMAN_SUCCESS;
00180 }
00181
00182 gearman_return_t gearmand_queue_libmemcached_init(gearmand_st *gearmand,
00183 gearman_conf_st *conf)
00184 {
00185 return gearman_server_queue_libmemcached_init(&(gearmand->server), conf);
00186 }
00187
00188 gearman_return_t gearmand_queue_libmemcached_deinit(gearmand_st *gearmand)
00189 {
00190 return gearman_server_queue_libmemcached_deinit(&(gearmand->server));
00191 }
00192
00193
00194
00195
00196
00197 static gearman_return_t _libmemcached_add(gearman_server_st *server,
00198 void *context, const void *unique,
00199 size_t unique_size,
00200 const void *function_name,
00201 size_t function_name_size,
00202 const void *data, size_t data_size,
00203 gearman_job_priority_t priority)
00204 {
00205 gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00206 memcached_return rc;
00207 char key[MEMCACHED_MAX_KEY];
00208 size_t key_length;
00209
00210 GEARMAN_SERVER_DEBUG(server, "libmemcached add: %.*s", (uint32_t)unique_size,
00211 (char *)unique);
00212
00213 key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
00214 GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
00215 (int)function_name_size,
00216 (const char *)function_name, (int)unique_size,
00217 (const char *)unique);
00218
00219 rc= memcached_set(&queue->memc, (const char *)key, key_length,
00220 (const char *)data, data_size, 0, (uint32_t)priority);
00221
00222 if (rc != MEMCACHED_SUCCESS)
00223 return GEARMAN_QUEUE_ERROR;
00224
00225 return GEARMAN_SUCCESS;
00226 }
00227
00228 static gearman_return_t _libmemcached_flush(gearman_server_st *server,
00229 void *context __attribute__((unused)))
00230 {
00231 GEARMAN_SERVER_DEBUG(server, "libmemcached flush");
00232
00233 return GEARMAN_SUCCESS;
00234 }
00235
00236 static gearman_return_t _libmemcached_done(gearman_server_st *server,
00237 void *context, const void *unique,
00238 size_t unique_size,
00239 const void *function_name,
00240 size_t function_name_size)
00241 {
00242 size_t key_length;
00243 char key[MEMCACHED_MAX_KEY];
00244 memcached_return rc;
00245 gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00246
00247 GEARMAN_SERVER_DEBUG(server, "libmemcached done: %.*s", (uint32_t)unique_size,
00248 (char *)unique);
00249
00250 key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
00251 GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
00252 (int)function_name_size,
00253 (const char *)function_name, (int)unique_size,
00254 (const char *)unique);
00255
00256
00257 rc= memcached_delete(&queue->memc, (const char *)key, key_length, 0);
00258
00259 if (rc != MEMCACHED_SUCCESS)
00260 return GEARMAN_QUEUE_ERROR;
00261
00262 return GEARMAN_SUCCESS;
00263 }
00264
00265 struct replay_context
00266 {
00267 memcached_st clone;
00268 gearman_server_st *server;
00269 gearman_queue_add_fn *add_fn;
00270 void *add_context;
00271 };
00272
00273 static memcached_return callback_loader(memcached_st *ptr __attribute__((unused)),
00274 memcached_result_st *result __attribute__((unused)),
00275 void *context)
00276 {
00277 struct replay_context *container= (struct replay_context *)context;
00278 char *unique;
00279 char *function;
00280 size_t unique_len;
00281
00282 unique= memcached_result_key_value(result);
00283
00284 if (strcmp(unique, GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX))
00285 return MEMCACHED_SUCCESS;
00286
00287 unique+=strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX);
00288
00289 function= index(unique, '-');
00290 unique_len= (size_t)(function - unique);
00291 function++;
00292
00293 assert(unique);
00294 assert(unique_len);
00295 assert(function);
00296 assert(strlen(function));
00297
00298 (void)(*container->add_fn)(container->server, container->add_context,
00299 unique, unique_len,
00300 function, strlen(function),
00301 memcached_result_value(result), memcached_result_length(result),
00302 memcached_result_flags(result));
00303
00304
00305 return MEMCACHED_SUCCESS;
00306 }
00307
00308
00309 static memcached_return callback_for_key(memcached_st *ptr __attribute__((unused)),
00310 const char *key, size_t key_length,
00311 void *context)
00312 {
00313 memcached_return rc;
00314 struct replay_context *container= (struct replay_context *)context;
00315 memcached_execute_function callbacks[1];
00316 char *passable[1];
00317
00318 callbacks[0]= &callback_loader;
00319
00320 passable[0]= (char *)key;
00321 rc= memcached_mget(&container->clone, (void *)passable, &key_length, 1);
00322
00323
00324 (void)memcached_fetch_execute(&container->clone, callbacks, context, 1);
00325
00326 return MEMCACHED_SUCCESS;
00327 }
00328
00329
00330
00331
00332 static gearman_return_t _libmemcached_replay(gearman_server_st *server, void *context,
00333 gearman_queue_add_fn *add_fn,
00334 void *add_context)
00335 {
00336 gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00337 struct replay_context container;
00338 memcached_st *check_for_failure;
00339 memcached_dump_func callbacks[1];
00340
00341 callbacks[0]= &callback_for_key;
00342
00343 GEARMAN_SERVER_INFO(server, "libmemcached replay start");
00344
00345 memset(&container, 0, sizeof(struct replay_context));
00346 check_for_failure= memcached_clone(&container.clone, &queue->memc);
00347 container.server= server;
00348 container.add_fn= add_fn;
00349 container.add_context= add_context;
00350
00351 assert(check_for_failure);
00352
00353
00354 (void)memcached_dump(&queue->memc, callbacks, (void *)&container, 1);
00355
00356 memcached_free(&container.clone);
00357
00358 return GEARMAN_SUCCESS;
00359 }