00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libmemcached.h>
00017 #include <libmemcached/memcached.h>
00018
00028 #define GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX "gear_"
00029
00033 typedef struct
00034 {
00035 memcached_st memc;
00036 } gearman_queue_libmemcached_st;
00037
00038
00039 static gearman_return_t _libmemcached_add(gearman_server_st *server,
00040 void *context, const void *unique,
00041 size_t unique_size,
00042 const void *function_name,
00043 size_t function_name_size,
00044 const void *data, size_t data_size,
00045 gearman_job_priority_t priority);
00046 static gearman_return_t _libmemcached_flush(gearman_server_st *server,
00047 void *context);
00048 static gearman_return_t _libmemcached_done(gearman_server_st *server,
00049 void *context, const void *unique,
00050 size_t unique_size,
00051 const void *function_name,
00052 size_t function_name_size);
00053 static gearman_return_t _libmemcached_replay(gearman_server_st *server,
00054 void *context,
00055 gearman_queue_add_fn *add_fn,
00056 void *add_context);
00057
00060
00061
00062
00063
00064 gearman_return_t gearman_server_queue_libmemcached_conf(gearman_conf_st *conf)
00065 {
00066 gearman_conf_module_st *module;
00067
00068 module= gearman_conf_module_create(conf, NULL, "libmemcached");
00069 if (module == NULL)
00070 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00071
00072 gearman_conf_module_add_option(module, "servers", 0, "SERVER_LIST",
00073 "List of Memcached servers to use.");
00074 return gearman_conf_return(conf);
00075 }
00076
00077 gearman_return_t
00078 gearman_server_queue_libmemcached_init(gearman_server_st *server,
00079 gearman_conf_st *conf)
00080 {
00081 gearman_queue_libmemcached_st *queue;
00082 gearman_conf_module_st *module;
00083 const char *name;
00084 const char *value;
00085 memcached_server_st *servers;
00086 const char *opt_servers= NULL;
00087
00088 gearman_log_info(server->gearman, "Initializing libmemcached module");
00089
00090 queue= calloc(1, sizeof(gearman_queue_libmemcached_st));
00091 if (queue == NULL)
00092 {
00093 gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "malloc");
00094
00095 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00096 }
00097
00098 if (memcached_create(&(queue->memc)) == NULL)
00099 {
00100 free(queue);
00101 gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "memcached_create");
00102 return GEARMAN_QUEUE_ERROR;
00103 }
00104
00105
00106 module= gearman_conf_module_find(conf, "libmemcached");
00107 if (module == NULL)
00108 {
00109 gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "gearman_conf_module_find:NULL");
00110
00111 return GEARMAN_QUEUE_ERROR;
00112 }
00113
00114 while (gearman_conf_module_value(module, &name, &value))
00115 {
00116 if (!strcmp(name, "servers"))
00117 opt_servers= value;
00118 else
00119 {
00120 memcached_free(&(queue->memc));
00121 free(queue);
00122 gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "Unknown argument: %s", name);
00123 return GEARMAN_QUEUE_ERROR;
00124 }
00125 }
00126
00127 if (opt_servers == NULL)
00128 {
00129 gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "No --servers given");
00130 return GEARMAN_QUEUE_ERROR;
00131 }
00132
00133 servers= memcached_servers_parse(opt_servers);
00134
00135 if (servers == NULL)
00136 {
00137 memcached_free(&(queue->memc));
00138 free(queue);
00139
00140 gearman_log_error(server->gearman, "gearman_queue_libmemcached_init", "memcached_servers_parse");
00141
00142 return GEARMAN_QUEUE_ERROR;
00143 }
00144
00145 memcached_server_push(&queue->memc, servers);
00146 memcached_server_list_free(servers);
00147
00148 gearman_server_set_queue_context(server, queue);
00149
00150 gearman_server_set_queue_add_fn(server, _libmemcached_add);
00151 gearman_server_set_queue_flush_fn(server, _libmemcached_flush);
00152 gearman_server_set_queue_done_fn(server, _libmemcached_done);
00153 gearman_server_set_queue_replay_fn(server, _libmemcached_replay);
00154
00155 return GEARMAN_SUCCESS;
00156 }
00157
00158 gearman_return_t
00159 gearman_server_queue_libmemcached_deinit(gearman_server_st *server)
00160 {
00161 gearman_queue_libmemcached_st *queue;
00162
00163 gearman_log_info(server->gearman, "Shutting down libmemcached queue module");
00164
00165 queue= (gearman_queue_libmemcached_st *)gearman_server_queue_context(server);
00166 gearman_server_set_queue_context(server, NULL);
00167 memcached_free(&(queue->memc));
00168
00169 free(queue);
00170
00171 return GEARMAN_SUCCESS;
00172 }
00173
00174 gearman_return_t gearmand_queue_libmemcached_init(gearmand_st *gearmand,
00175 gearman_conf_st *conf)
00176 {
00177 return gearman_server_queue_libmemcached_init(&(gearmand->server), conf);
00178 }
00179
00180 gearman_return_t gearmand_queue_libmemcached_deinit(gearmand_st *gearmand)
00181 {
00182 return gearman_server_queue_libmemcached_deinit(&(gearmand->server));
00183 }
00184
00185
00186
00187
00188
00189 static gearman_return_t _libmemcached_add(gearman_server_st *server,
00190 void *context, const void *unique,
00191 size_t unique_size,
00192 const void *function_name,
00193 size_t function_name_size,
00194 const void *data, size_t data_size,
00195 gearman_job_priority_t priority)
00196 {
00197 gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00198 memcached_return rc;
00199 char key[MEMCACHED_MAX_KEY];
00200 size_t key_length;
00201
00202 gearman_log_debug(server->gearman, "libmemcached add: %.*s", (uint32_t)unique_size, (char *)unique);
00203
00204 key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
00205 GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
00206 (int)function_name_size,
00207 (const char *)function_name, (int)unique_size,
00208 (const char *)unique);
00209
00210 rc= memcached_set(&queue->memc, (const char *)key, key_length,
00211 (const char *)data, data_size, 0, (uint32_t)priority);
00212
00213 if (rc != MEMCACHED_SUCCESS)
00214 return GEARMAN_QUEUE_ERROR;
00215
00216 return GEARMAN_SUCCESS;
00217 }
00218
00219 static gearman_return_t _libmemcached_flush(gearman_server_st *server,
00220 void *context __attribute__((unused)))
00221 {
00222 gearman_log_debug(server->gearman, "libmemcached flush");
00223
00224 return GEARMAN_SUCCESS;
00225 }
00226
00227 static gearman_return_t _libmemcached_done(gearman_server_st *server,
00228 void *context, const void *unique,
00229 size_t unique_size,
00230 const void *function_name,
00231 size_t function_name_size)
00232 {
00233 size_t key_length;
00234 char key[MEMCACHED_MAX_KEY];
00235 memcached_return rc;
00236 gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00237
00238 gearman_log_debug(server->gearman, "libmemcached done: %.*s", (uint32_t)unique_size, (char *)unique);
00239
00240 key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
00241 GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
00242 (int)function_name_size,
00243 (const char *)function_name, (int)unique_size,
00244 (const char *)unique);
00245
00246
00247 rc= memcached_delete(&queue->memc, (const char *)key, key_length, 0);
00248
00249 if (rc != MEMCACHED_SUCCESS)
00250 return GEARMAN_QUEUE_ERROR;
00251
00252 return GEARMAN_SUCCESS;
00253 }
00254
00255 struct replay_context
00256 {
00257 memcached_st clone;
00258 gearman_server_st *server;
00259 gearman_queue_add_fn *add_fn;
00260 void *add_context;
00261 };
00262
00263 static memcached_return callback_loader(const memcached_st *ptr __attribute__((unused)),
00264 memcached_result_st *result __attribute__((unused)),
00265 void *context)
00266 {
00267 struct replay_context *container= (struct replay_context *)context;
00268 const char *key;
00269 const char *unique;
00270 char *function;
00271 size_t unique_len;
00272
00273 key= memcached_result_key_value(result);
00274
00275 if (strcmp(key, GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX))
00276 return MEMCACHED_SUCCESS;
00277
00278 unique= key + strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX);
00279
00280 function= index(unique, '-');
00281 unique_len= (size_t)(function - unique);
00282 function++;
00283
00284 assert(unique);
00285 assert(unique_len);
00286 assert(function);
00287 assert(strlen(function));
00288
00289 (void)(*container->add_fn)(container->server, container->add_context,
00290 unique, unique_len,
00291 function, strlen(function),
00292 memcached_result_value(result), memcached_result_length(result),
00293 memcached_result_flags(result));
00294
00295
00296 return MEMCACHED_SUCCESS;
00297 }
00298
00299
00300 static memcached_return callback_for_key(const memcached_st *ptr __attribute__((unused)),
00301 const char *key, size_t key_length,
00302 void *context)
00303 {
00304 memcached_return rc;
00305 struct replay_context *container= (struct replay_context *)context;
00306 memcached_execute_function callbacks[1];
00307 char *passable[1];
00308
00309 callbacks[0]= (memcached_execute_fn)&callback_loader;
00310
00311 passable[0]= (char *)key;
00312 rc= memcached_mget(&container->clone, (void *)passable, &key_length, 1);
00313
00314
00315 (void)memcached_fetch_execute(&container->clone, callbacks, context, 1);
00316
00317 return MEMCACHED_SUCCESS;
00318 }
00319
00320
00321
00322
00323 static gearman_return_t _libmemcached_replay(gearman_server_st *server, void *context,
00324 gearman_queue_add_fn *add_fn,
00325 void *add_context)
00326 {
00327 gearman_queue_libmemcached_st *queue= (gearman_queue_libmemcached_st *)context;
00328 struct replay_context container;
00329 memcached_st *check_for_failure;
00330 memcached_dump_func callbacks[1];
00331
00332 callbacks[0]= (memcached_dump_fn)&callback_for_key;
00333
00334 gearman_log_info(server->gearman, "libmemcached replay start");
00335
00336 memset(&container, 0, sizeof(struct replay_context));
00337 check_for_failure= memcached_clone(&container.clone, &queue->memc);
00338 container.server= server;
00339 container.add_fn= add_fn;
00340 container.add_context= add_context;
00341
00342 assert(check_for_failure);
00343
00344
00345 (void)memcached_dump(&queue->memc, callbacks, (void *)&container, 1);
00346
00347 memcached_free(&container.clone);
00348
00349 return GEARMAN_SUCCESS;
00350 }