00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libdrizzle.h>
00017 #include <libdrizzle/drizzle_client.h>
00018
00028 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE "test"
00029 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE "queue"
00030 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00031
00035 typedef struct
00036 {
00037 drizzle_st drizzle;
00038 drizzle_con_st con;
00039 drizzle_result_st result;
00040 char table[DRIZZLE_MAX_TABLE_SIZE];
00041 char *query;
00042 size_t query_size;
00043 } gearman_queue_libdrizzle_st;
00044
00048 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00049 gearman_queue_libdrizzle_st *queue,
00050 const char *query, size_t query_size);
00051
00052
00053 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00054 void *context, const void *unique,
00055 size_t unique_size,
00056 const void *function_name,
00057 size_t function_name_size,
00058 const void *data, size_t data_size,
00059 gearman_job_priority_t priority);
00060 static gearman_return_t _libdrizzle_flush(gearman_server_st *gearman,
00061 void *context);
00062 static gearman_return_t _libdrizzle_done(gearman_server_st *gearman,
00063 void *context, const void *unique,
00064 size_t unique_size,
00065 const void *function_name,
00066 size_t function_name_size);
00067 static gearman_return_t _libdrizzle_replay(gearman_server_st *gearman,
00068 void *context,
00069 gearman_queue_add_fn *add_fn,
00070 void *add_context);
00071
00074
00075
00076
00077
00078 gearman_return_t gearman_server_queue_libdrizzle_conf(gearman_conf_st *conf)
00079 {
00080 gearman_conf_module_st *module;
00081
00082 module= gearman_conf_module_create(conf, NULL, "libdrizzle");
00083 if (module == NULL)
00084 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00085
00086 #define MCO(__name, __value, __help) \
00087 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00088
00089 MCO("host", "HOST", "Host of server.")
00090 MCO("port", "PORT", "Port of server.")
00091 MCO("uds", "UDS", "Unix domain socket for server.")
00092 MCO("user", "USER", "User name for authentication.")
00093 MCO("password", "PASSWORD", "Password for authentication.")
00094 MCO("db", "DB", "Database to use.")
00095 MCO("table", "TABLE", "Table to use.")
00096 MCO("mysql", NULL, "Use MySQL protocol.")
00097
00098 return gearman_conf_return(conf);
00099 }
00100
00101 gearman_return_t gearman_server_queue_libdrizzle_init(gearman_server_st *server,
00102 gearman_conf_st *conf)
00103 {
00104 gearman_queue_libdrizzle_st *queue;
00105 gearman_conf_module_st *module;
00106 const char *name;
00107 const char *value;
00108 const char *host= NULL;
00109 in_port_t port= 0;
00110 const char *uds= NULL;
00111 const char *user= NULL;
00112 const char *password= NULL;
00113 drizzle_row_t row;
00114 char create[1024];
00115
00116 gearman_log_info(server->gearman, "Initializing libdrizzle module");
00117
00118 queue= (gearman_queue_libdrizzle_st *)malloc(sizeof(gearman_queue_libdrizzle_st));
00119 if (queue == NULL)
00120 {
00121 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "malloc");
00122 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00123 }
00124
00125 memset(queue, 0, sizeof(gearman_queue_libdrizzle_st));
00126 snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE,
00127 GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE);
00128
00129 if (drizzle_create(&(queue->drizzle)) == NULL)
00130 {
00131 free(queue);
00132 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_create");
00133 return GEARMAN_QUEUE_ERROR;
00134 }
00135
00136 if (drizzle_con_create(&(queue->drizzle), &(queue->con)) == NULL)
00137 {
00138 drizzle_free(&(queue->drizzle));
00139 free(queue);
00140 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_con_create");
00141 return GEARMAN_QUEUE_ERROR;
00142 }
00143
00144 gearman_server_set_queue_context(server, queue);
00145
00146 drizzle_con_set_db(&(queue->con), GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE);
00147
00148
00149 module= gearman_conf_module_find(conf, "libdrizzle");
00150 if (module == NULL)
00151 {
00152 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "gearman_conf_module_find:NULL");
00153 return GEARMAN_QUEUE_ERROR;
00154 }
00155
00156 while (gearman_conf_module_value(module, &name, &value))
00157 {
00158 if (!strcmp(name, "host"))
00159 host= value;
00160 else if (!strcmp(name, "port"))
00161 port= (in_port_t)atoi(value);
00162 else if (!strcmp(name, "uds"))
00163 uds= value;
00164 else if (!strcmp(name, "user"))
00165 user= value;
00166 else if (!strcmp(name, "password"))
00167 password= value;
00168 else if (!strcmp(name, "db"))
00169 drizzle_con_set_db(&(queue->con), value);
00170 else if (!strcmp(name, "table"))
00171 snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE, "%s", value);
00172 else if (!strcmp(name, "mysql"))
00173 drizzle_con_set_options(&(queue->con), DRIZZLE_CON_MYSQL);
00174 else
00175 {
00176 gearman_server_queue_libdrizzle_deinit(server);
00177 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "Unknown argument: %s", name);
00178 return GEARMAN_QUEUE_ERROR;
00179 }
00180 }
00181
00182 if (uds == NULL)
00183 drizzle_con_set_tcp(&(queue->con), host, port);
00184 else
00185 drizzle_con_set_uds(&(queue->con), uds);
00186
00187 drizzle_con_set_auth(&(queue->con), user, password);
00188
00189
00190 if (password != NULL)
00191 memset((void *)password, 'x', strlen(password));
00192
00193 if (_libdrizzle_query(server, queue, "SHOW TABLES", 11) != DRIZZLE_RETURN_OK)
00194 {
00195 gearman_server_queue_libdrizzle_deinit(server);
00196 return GEARMAN_QUEUE_ERROR;
00197 }
00198
00199 if (drizzle_result_buffer(&(queue->result)) != DRIZZLE_RETURN_OK)
00200 {
00201 drizzle_result_free(&(queue->result));
00202 gearman_server_queue_libdrizzle_deinit(server);
00203 gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_result_buffer:%s", drizzle_error(&(queue->drizzle)));
00204 return GEARMAN_QUEUE_ERROR;
00205 }
00206
00207 while ((row= drizzle_row_next(&(queue->result))) != NULL)
00208 {
00209 if (!strcasecmp(queue->table, row[0]))
00210 {
00211 gearman_log_info(server->gearman, "libdrizzle module using table '%s.%s'", drizzle_con_db(&queue->con), row[0]);
00212
00213 break;
00214 }
00215 }
00216
00217 drizzle_result_free(&(queue->result));
00218
00219 if (row == NULL)
00220 {
00221 snprintf(create, 1024,
00222 "CREATE TABLE %s"
00223 "("
00224 "unique_key VARCHAR(%d) PRIMARY KEY,"
00225 "function_name VARCHAR(255),"
00226 "priority INT,"
00227 "data LONGBLOB"
00228 ")",
00229 queue->table, GEARMAN_UNIQUE_SIZE);
00230
00231 gearman_log_info(server->gearman, "libdrizzle module creating table '%s.%s'",
00232 drizzle_con_db(&queue->con), queue->table);
00233
00234 if (_libdrizzle_query(server, queue, create, strlen(create))
00235 != DRIZZLE_RETURN_OK)
00236 {
00237 gearman_server_queue_libdrizzle_deinit(server);
00238 return GEARMAN_QUEUE_ERROR;
00239 }
00240
00241 drizzle_result_free(&(queue->result));
00242 }
00243
00244 gearman_server_set_queue_add_fn(server, _libdrizzle_add);
00245 gearman_server_set_queue_flush_fn(server, _libdrizzle_flush);
00246 gearman_server_set_queue_done_fn(server, _libdrizzle_done);
00247 gearman_server_set_queue_replay_fn(server, _libdrizzle_replay);
00248
00249 return GEARMAN_SUCCESS;
00250 }
00251
00252 gearman_return_t
00253 gearman_server_queue_libdrizzle_deinit(gearman_server_st *server)
00254 {
00255 gearman_queue_libdrizzle_st *queue;
00256
00257 gearman_log_info(server->gearman, "Shutting down libdrizzle queue module");
00258
00259 queue= (gearman_queue_libdrizzle_st *)gearman_server_queue_context(server);
00260 gearman_server_set_queue_context(server, NULL);
00261 drizzle_con_free(&(queue->con));
00262 drizzle_free(&(queue->drizzle));
00263 if (queue->query != NULL)
00264 free(queue->query);
00265 free(queue);
00266
00267 return GEARMAN_SUCCESS;
00268 }
00269
00270 gearman_return_t gearmand_queue_libdrizzle_init(gearmand_st *gearmand,
00271 gearman_conf_st *conf)
00272 {
00273 return gearman_server_queue_libdrizzle_init(&(gearmand->server), conf);
00274 }
00275
00276 gearman_return_t gearmand_queue_libdrizzle_deinit(gearmand_st *gearmand)
00277 {
00278 return gearman_server_queue_libdrizzle_deinit(&(gearmand->server));
00279 }
00280
00281
00282
00283
00284
00285 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00286 gearman_queue_libdrizzle_st *queue,
00287 const char *query, size_t query_size)
00288 {
00289 drizzle_return_t ret;
00290
00291 gearman_log_crazy(server->gearman, "libdrizzle query: %.*s", (uint32_t)query_size, query);
00292
00293 (void)drizzle_query(&(queue->con), &(queue->result), query, query_size, &ret);
00294 if (ret != DRIZZLE_RETURN_OK)
00295 {
00296
00297 if (ret == DRIZZLE_RETURN_LOST_CONNECTION)
00298 {
00299 (void)drizzle_query(&(queue->con), &(queue->result), query, query_size,
00300 &ret);
00301 }
00302
00303 if (ret != DRIZZLE_RETURN_OK)
00304 {
00305 gearman_log_error(server->gearman, "_libdrizzle_query", "drizzle_query:%s",
00306 drizzle_error(&(queue->drizzle)));
00307 return ret;
00308 }
00309 }
00310
00311 return DRIZZLE_RETURN_OK;
00312 }
00313
00314 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00315 void *context, const void *unique,
00316 size_t unique_size,
00317 const void *function_name,
00318 size_t function_name_size,
00319 const void *data, size_t data_size,
00320 gearman_job_priority_t priority)
00321 {
00322 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00323 char *query;
00324 size_t query_size;
00325
00326 gearman_log_debug(server->gearman, "libdrizzle add: %.*s", (uint32_t)unique_size, (char *)unique);
00327
00328
00329
00330 #if 0
00331 if (!not started)
00332 {
00333 if (_query(drizzle, "BEGIN", 5) != DRIZZLE_RETURN_OK)
00334 return REPQ_RETURN_EXTERNAL;
00335
00336 drizzle_result_free(&(drizzle->result));
00337 }
00338 #endif
00339
00340 query_size= ((unique_size + function_name_size + data_size) * 2) +
00341 GEARMAN_QUEUE_QUERY_BUFFER;
00342 if (query_size > queue->query_size)
00343 {
00344 query= (char *)realloc(queue->query, query_size);
00345 if (query == NULL)
00346 {
00347 gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00348 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00349 }
00350
00351 queue->query= query;
00352 queue->query_size= query_size;
00353 }
00354 else
00355 {
00356 query= queue->query;
00357 }
00358
00359 query_size= (size_t)snprintf(query, query_size,
00360 "INSERT INTO %s SET priority=%u,unique_key='",
00361 queue->table, (uint32_t)priority);
00362
00363 query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00364 unique_size);
00365 memcpy(query + query_size, "',function_name='", 17);
00366 query_size+= 17;
00367
00368 query_size+= (size_t)drizzle_escape_string(query + query_size, function_name,
00369 function_name_size);
00370 memcpy(query + query_size, "',data='", 8);
00371 query_size+= 8;
00372
00373 query_size+= (size_t)drizzle_escape_string(query + query_size, data,
00374 data_size);
00375 memcpy(query + query_size, "'", 1);
00376 query_size+= 1;
00377
00378 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00379 return GEARMAN_QUEUE_ERROR;
00380
00381 drizzle_result_free(&(queue->result));
00382
00383 return GEARMAN_SUCCESS;
00384 }
00385
00386 static gearman_return_t _libdrizzle_flush(gearman_server_st *server,
00387 void *context __attribute__((unused)))
00388 {
00389 gearman_log_debug(server->gearman, "libdrizzle flush");
00390
00391 return GEARMAN_SUCCESS;
00392 }
00393
00394 static gearman_return_t _libdrizzle_done(gearman_server_st *server,
00395 void *context, const void *unique,
00396 size_t unique_size,
00397 const void *function_name __attribute__((unused)),
00398 size_t function_name_size __attribute__((unused)))
00399 {
00400 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00401 char *query;
00402 size_t query_size;
00403
00404 gearman_log_debug(server->gearman, "libdrizzle done: %.*s", (uint32_t)unique_size, (char *)unique);
00405
00406 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00407 if (query_size > queue->query_size)
00408 {
00409 query= (char *)realloc(queue->query, query_size);
00410 if (query == NULL)
00411 {
00412 gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00413 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00414 }
00415
00416 queue->query= query;
00417 queue->query_size= query_size;
00418 }
00419 else
00420 query= queue->query;
00421
00422 query_size= (size_t)snprintf(query, query_size,
00423 "DELETE FROM %s WHERE unique_key='",
00424 queue->table);
00425
00426 query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00427 unique_size);
00428 memcpy(query + query_size, "'", 1);
00429 query_size+= 1;
00430
00431 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00432 return GEARMAN_QUEUE_ERROR;
00433
00434 drizzle_result_free(&(queue->result));
00435
00436 return GEARMAN_SUCCESS;
00437 }
00438
00439 static gearman_return_t _libdrizzle_replay(gearman_server_st *server,
00440 void *context,
00441 gearman_queue_add_fn *add_fn,
00442 void *add_context)
00443 {
00444 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00445 char *query;
00446 size_t query_size;
00447 drizzle_return_t ret;
00448 drizzle_row_t row;
00449 size_t *field_sizes;
00450 gearman_return_t gret;
00451
00452 gearman_log_info(server->gearman, "libdrizzle replay start");
00453
00454 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00455 {
00456 query= (char *)realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00457 if (query == NULL)
00458 {
00459 gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00460 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00461 }
00462
00463 queue->query= query;
00464 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00465 }
00466 else
00467 query= queue->query;
00468
00469 query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00470 "SELECT unique_key,function_name,priority,data "
00471 "FROM %s",
00472 queue->table);
00473
00474 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00475 return GEARMAN_QUEUE_ERROR;
00476
00477 if (drizzle_column_skip(&(queue->result)) != DRIZZLE_RETURN_OK)
00478 {
00479 drizzle_result_free(&(queue->result));
00480 gearman_log_error(server->gearman, "_libdrizzle_replay", "drizzle_column_skip:%s", drizzle_error(&(queue->drizzle)));
00481
00482 return GEARMAN_QUEUE_ERROR;
00483 }
00484
00485 while (1)
00486 {
00487 row= drizzle_row_buffer(&(queue->result), &ret);
00488 if (ret != DRIZZLE_RETURN_OK)
00489 {
00490 drizzle_result_free(&(queue->result));
00491 gearman_log_error(server->gearman, "_libdrizzle_replay", "drizzle_row_buffer:%s", drizzle_error(&(queue->drizzle)));
00492
00493 return GEARMAN_QUEUE_ERROR;
00494 }
00495
00496 if (row == NULL)
00497 break;
00498
00499 field_sizes= drizzle_row_field_sizes(&(queue->result));
00500
00501 gearman_log_debug(server->gearman, "libdrizzle replay: %.*s", (uint32_t)field_sizes[0], row[1]);
00502
00503 gret= (*add_fn)(server, add_context, row[0], field_sizes[0], row[1],
00504 field_sizes[1], row[3], field_sizes[3], atoi(row[2]));
00505 if (gret != GEARMAN_SUCCESS)
00506 {
00507 drizzle_row_free(&(queue->result), row);
00508 drizzle_result_free(&(queue->result));
00509 return gret;
00510 }
00511
00512 row[3]= NULL;
00513 drizzle_row_free(&(queue->result), row);
00514 }
00515
00516 drizzle_result_free(&(queue->result));
00517
00518 return GEARMAN_SUCCESS;
00519 }