00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libdrizzle.h>
00017 #include <libdrizzle/drizzle_client.h>
00018
00028 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE "test"
00029 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE "queue"
00030 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00031
00032
00033
00034
00035
00039 typedef struct
00040 {
00041 drizzle_st drizzle;
00042 drizzle_con_st con;
00043 drizzle_result_st result;
00044 char table[DRIZZLE_MAX_TABLE_SIZE];
00045 char *query;
00046 size_t query_size;
00047 } gearman_queue_libdrizzle_st;
00048
00052 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00053 gearman_queue_libdrizzle_st *queue,
00054 const char *query, size_t query_size);
00055
00056
00057 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00058 void *context, const void *unique,
00059 size_t unique_size,
00060 const void *function_name,
00061 size_t function_name_size,
00062 const void *data, size_t data_size,
00063 gearman_job_priority_t priority);
00064 static gearman_return_t _libdrizzle_flush(gearman_server_st *gearman,
00065 void *context);
00066 static gearman_return_t _libdrizzle_done(gearman_server_st *gearman,
00067 void *context, const void *unique,
00068 size_t unique_size,
00069 const void *function_name,
00070 size_t function_name_size);
00071 static gearman_return_t _libdrizzle_replay(gearman_server_st *gearman,
00072 void *context,
00073 gearman_queue_add_fn *add_fn,
00074 void *add_context);
00075
00078
00079
00080
00081
00082 gearman_return_t gearman_server_queue_libdrizzle_conf(gearman_conf_st *conf)
00083 {
00084 gearman_conf_module_st *module;
00085
00086 module= gearman_conf_module_create(conf, NULL, "libdrizzle");
00087 if (module == NULL)
00088 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00089
00090 #define MCO(__name, __value, __help) \
00091 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00092
00093 MCO("host", "HOST", "Host of server.")
00094 MCO("port", "PORT", "Port of server.")
00095 MCO("uds", "UDS", "Unix domain socket for server.")
00096 MCO("user", "USER", "User name for authentication.")
00097 MCO("password", "PASSWORD", "Password for authentication.")
00098 MCO("db", "DB", "Database to use.")
00099 MCO("table", "TABLE", "Table to use.")
00100 MCO("mysql", NULL, "Use MySQL protocol.")
00101
00102 return gearman_conf_return(conf);
00103 }
00104
00105 gearman_return_t gearman_server_queue_libdrizzle_init(gearman_server_st *server,
00106 gearman_conf_st *conf)
00107 {
00108 gearman_queue_libdrizzle_st *queue;
00109 gearman_conf_module_st *module;
00110 const char *name;
00111 const char *value;
00112 const char *host= NULL;
00113 in_port_t port= 0;
00114 const char *uds= NULL;
00115 const char *user= NULL;
00116 const char *password= NULL;
00117 drizzle_row_t row;
00118 char create[1024];
00119
00120 GEARMAN_SERVER_INFO(server, "Initializing libdrizzle module")
00121
00122 queue= malloc(sizeof(gearman_queue_libdrizzle_st));
00123 if (queue == NULL)
00124 {
00125 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libdrizzle_init", "malloc")
00126 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00127 }
00128
00129 memset(queue, 0, sizeof(gearman_queue_libdrizzle_st));
00130 snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE,
00131 GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE);
00132
00133 if (drizzle_create(&(queue->drizzle)) == NULL)
00134 {
00135 free(queue);
00136 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libdrizzle_init",
00137 "drizzle_create")
00138 return GEARMAN_QUEUE_ERROR;
00139 }
00140
00141 if (drizzle_con_create(&(queue->drizzle), &(queue->con)) == NULL)
00142 {
00143 drizzle_free(&(queue->drizzle));
00144 free(queue);
00145 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libdrizzle_init",
00146 "drizzle_con_create")
00147 return GEARMAN_QUEUE_ERROR;
00148 }
00149
00150 gearman_server_set_queue_context(server, queue);
00151
00152 drizzle_con_set_db(&(queue->con), GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE);
00153
00154
00155 module= gearman_conf_module_find(conf, "libdrizzle");
00156 if (module == NULL)
00157 {
00158 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libdrizzle_init",
00159 "gearman_conf_module_find:NULL")
00160 return GEARMAN_QUEUE_ERROR;
00161 }
00162
00163 while (gearman_conf_module_value(module, &name, &value))
00164 {
00165 if (!strcmp(name, "host"))
00166 host= value;
00167 else if (!strcmp(name, "port"))
00168 port= (in_port_t)atoi(value);
00169 else if (!strcmp(name, "uds"))
00170 uds= value;
00171 else if (!strcmp(name, "user"))
00172 user= value;
00173 else if (!strcmp(name, "password"))
00174 password= value;
00175 else if (!strcmp(name, "db"))
00176 drizzle_con_set_db(&(queue->con), value);
00177 else if (!strcmp(name, "table"))
00178 snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE, "%s", value);
00179 else if (!strcmp(name, "mysql"))
00180 drizzle_con_set_options(&(queue->con), DRIZZLE_CON_MYSQL);
00181 else
00182 {
00183 gearman_server_queue_libdrizzle_deinit(server);
00184 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libdrizzle_init",
00185 "Unknown argument: %s", name)
00186 return GEARMAN_QUEUE_ERROR;
00187 }
00188 }
00189
00190 if (uds == NULL)
00191 drizzle_con_set_tcp(&(queue->con), host, port);
00192 else
00193 drizzle_con_set_uds(&(queue->con), uds);
00194
00195 drizzle_con_set_auth(&(queue->con), user, password);
00196
00197
00198 if (password != NULL)
00199 memset((void *)password, 'x', strlen(password));
00200
00201 if (_libdrizzle_query(server, queue, "SHOW TABLES", 11) != DRIZZLE_RETURN_OK)
00202 {
00203 gearman_server_queue_libdrizzle_deinit(server);
00204 return GEARMAN_QUEUE_ERROR;
00205 }
00206
00207 if (drizzle_result_buffer(&(queue->result)) != DRIZZLE_RETURN_OK)
00208 {
00209 drizzle_result_free(&(queue->result));
00210 gearman_server_queue_libdrizzle_deinit(server);
00211 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libdrizzle_init",
00212 "drizzle_result_buffer:%s",
00213 drizzle_error(&(queue->drizzle)))
00214 return GEARMAN_QUEUE_ERROR;
00215 }
00216
00217 while ((row= drizzle_row_next(&(queue->result))) != NULL)
00218 {
00219 if (!strcasecmp(queue->table, row[0]))
00220 {
00221 GEARMAN_SERVER_INFO(server, "libdrizzle module using table '%s.%s'",
00222 drizzle_con_db(&queue->con), row[0])
00223 break;
00224 }
00225 }
00226
00227 drizzle_result_free(&(queue->result));
00228
00229 if (row == NULL)
00230 {
00231 snprintf(create, 1024,
00232 "CREATE TABLE %s"
00233 "("
00234 "unique_key VARCHAR(%d) PRIMARY KEY,"
00235 "function_name VARCHAR(255),"
00236 "priority INT,"
00237 "data LONGBLOB"
00238 ")",
00239 queue->table, GEARMAN_UNIQUE_SIZE);
00240
00241 GEARMAN_SERVER_INFO(server, "libdrizzle module creating table '%s.%s'",
00242 drizzle_con_db(&queue->con), queue->table)
00243
00244 if (_libdrizzle_query(server, queue, create, strlen(create))
00245 != DRIZZLE_RETURN_OK)
00246 {
00247 gearman_server_queue_libdrizzle_deinit(server);
00248 return GEARMAN_QUEUE_ERROR;
00249 }
00250
00251 drizzle_result_free(&(queue->result));
00252 }
00253
00254 gearman_server_set_queue_add_fn(server, _libdrizzle_add);
00255 gearman_server_set_queue_flush_fn(server, _libdrizzle_flush);
00256 gearman_server_set_queue_done_fn(server, _libdrizzle_done);
00257 gearman_server_set_queue_replay_fn(server, _libdrizzle_replay);
00258
00259 return GEARMAN_SUCCESS;
00260 }
00261
00262 gearman_return_t
00263 gearman_server_queue_libdrizzle_deinit(gearman_server_st *server)
00264 {
00265 gearman_queue_libdrizzle_st *queue;
00266
00267 GEARMAN_SERVER_INFO(server, "Shutting down libdrizzle queue module")
00268
00269 queue= (gearman_queue_libdrizzle_st *)gearman_server_queue_context(server);
00270 gearman_server_set_queue_context(server, NULL);
00271 drizzle_con_free(&(queue->con));
00272 drizzle_free(&(queue->drizzle));
00273 if (queue->query != NULL)
00274 free(queue->query);
00275 free(queue);
00276
00277 return GEARMAN_SUCCESS;
00278 }
00279
00280 gearman_return_t gearmand_queue_libdrizzle_init(gearmand_st *gearmand,
00281 gearman_conf_st *conf)
00282 {
00283 return gearman_server_queue_libdrizzle_init(&(gearmand->server), conf);
00284 }
00285
00286 gearman_return_t gearmand_queue_libdrizzle_deinit(gearmand_st *gearmand)
00287 {
00288 return gearman_server_queue_libdrizzle_deinit(&(gearmand->server));
00289 }
00290
00291
00292
00293
00294
00295 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00296 gearman_queue_libdrizzle_st *queue,
00297 const char *query, size_t query_size)
00298 {
00299 drizzle_return_t ret;
00300
00301 GEARMAN_SERVER_CRAZY(server, "libdrizzle query: %.*s", (uint32_t)query_size,
00302 query)
00303
00304 (void)drizzle_query(&(queue->con), &(queue->result), query, query_size, &ret);
00305 if (ret != DRIZZLE_RETURN_OK)
00306 {
00307
00308 if (ret == DRIZZLE_RETURN_LOST_CONNECTION)
00309 {
00310 (void)drizzle_query(&(queue->con), &(queue->result), query, query_size,
00311 &ret);
00312 }
00313
00314 if (ret != DRIZZLE_RETURN_OK)
00315 {
00316 GEARMAN_SERVER_ERROR_SET(server, "_libdrizzle_query", "drizzle_query:%s",
00317 drizzle_error(&(queue->drizzle)))
00318 return ret;
00319 }
00320 }
00321
00322 return DRIZZLE_RETURN_OK;
00323 }
00324
00325 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00326 void *context, const void *unique,
00327 size_t unique_size,
00328 const void *function_name,
00329 size_t function_name_size,
00330 const void *data, size_t data_size,
00331 gearman_job_priority_t priority)
00332 {
00333 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00334 char *query;
00335 size_t query_size;
00336
00337 GEARMAN_SERVER_DEBUG(server, "libdrizzle add: %.*s", (uint32_t)unique_size,
00338 (char *)unique)
00339
00340
00341
00342 #if 0
00343 if (!not started)
00344 {
00345 if (_query(drizzle, "BEGIN", 5) != DRIZZLE_RETURN_OK)
00346 return REPQ_RETURN_EXTERNAL;
00347
00348 drizzle_result_free(&(drizzle->result));
00349 }
00350 #endif
00351
00352 query_size= ((unique_size + function_name_size + data_size) * 2) +
00353 GEARMAN_QUEUE_QUERY_BUFFER;
00354 if (query_size > queue->query_size)
00355 {
00356 query= realloc(queue->query, query_size);
00357 if (query == NULL)
00358 {
00359 GEARMAN_SERVER_ERROR_SET(server, "_libdrizzle_add", "realloc")
00360 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00361 }
00362
00363 queue->query= query;
00364 queue->query_size= query_size;
00365 }
00366 else
00367 query= queue->query;
00368
00369 query_size= (size_t)snprintf(query, query_size,
00370 "INSERT INTO %s SET priority=%u,unique_key='",
00371 queue->table, (uint32_t)priority);
00372
00373 query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00374 unique_size);
00375 memcpy(query + query_size, "',function_name='", 17);
00376 query_size+= 17;
00377
00378 query_size+= (size_t)drizzle_escape_string(query + query_size, function_name,
00379 function_name_size);
00380 memcpy(query + query_size, "',data='", 8);
00381 query_size+= 8;
00382
00383 query_size+= (size_t)drizzle_escape_string(query + query_size, data,
00384 data_size);
00385 memcpy(query + query_size, "'", 1);
00386 query_size+= 1;
00387
00388 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00389 return GEARMAN_QUEUE_ERROR;
00390
00391 drizzle_result_free(&(queue->result));
00392
00393 return GEARMAN_SUCCESS;
00394 }
00395
00396 static gearman_return_t _libdrizzle_flush(gearman_server_st *server,
00397 void *context __attribute__((unused)))
00398 {
00399 GEARMAN_SERVER_DEBUG(server, "libdrizzle flush")
00400
00401 return GEARMAN_SUCCESS;
00402 }
00403
00404 static gearman_return_t _libdrizzle_done(gearman_server_st *server,
00405 void *context, const void *unique,
00406 size_t unique_size,
00407 const void *function_name __attribute__((unused)),
00408 size_t function_name_size __attribute__((unused)))
00409 {
00410 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00411 char *query;
00412 size_t query_size;
00413
00414 GEARMAN_SERVER_DEBUG(server, "libdrizzle done: %.*s", (uint32_t)unique_size,
00415 (char *)unique)
00416
00417 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00418 if (query_size > queue->query_size)
00419 {
00420 query= realloc(queue->query, query_size);
00421 if (query == NULL)
00422 {
00423 GEARMAN_SERVER_ERROR_SET(server, "_libdrizzle_add", "realloc")
00424 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00425 }
00426
00427 queue->query= query;
00428 queue->query_size= query_size;
00429 }
00430 else
00431 query= queue->query;
00432
00433 query_size= (size_t)snprintf(query, query_size,
00434 "DELETE FROM %s WHERE unique_key='",
00435 queue->table);
00436
00437 query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00438 unique_size);
00439 memcpy(query + query_size, "'", 1);
00440 query_size+= 1;
00441
00442 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00443 return GEARMAN_QUEUE_ERROR;
00444
00445 drizzle_result_free(&(queue->result));
00446
00447 return GEARMAN_SUCCESS;
00448 }
00449
00450 static gearman_return_t _libdrizzle_replay(gearman_server_st *server,
00451 void *context,
00452 gearman_queue_add_fn *add_fn,
00453 void *add_context)
00454 {
00455 gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00456 char *query;
00457 size_t query_size;
00458 drizzle_return_t ret;
00459 drizzle_row_t row;
00460 size_t *field_sizes;
00461 gearman_return_t gret;
00462
00463 GEARMAN_SERVER_INFO(server, "libdrizzle replay start")
00464
00465 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00466 {
00467 query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00468 if (query == NULL)
00469 {
00470 GEARMAN_SERVER_ERROR_SET(server, "_libdrizzle_add", "realloc")
00471 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00472 }
00473
00474 queue->query= query;
00475 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00476 }
00477 else
00478 query= queue->query;
00479
00480 query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00481 "SELECT unique_key,function_name,priority,data "
00482 "FROM %s",
00483 queue->table);
00484
00485 if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00486 return GEARMAN_QUEUE_ERROR;
00487
00488 if (drizzle_column_skip(&(queue->result)) != DRIZZLE_RETURN_OK)
00489 {
00490 drizzle_result_free(&(queue->result));
00491 GEARMAN_SERVER_ERROR_SET(server, "_libdrizzle_replay",
00492 "drizzle_column_skip:%s",
00493 drizzle_error(&(queue->drizzle)))
00494 return GEARMAN_QUEUE_ERROR;
00495 }
00496
00497 while (1)
00498 {
00499 row= drizzle_row_buffer(&(queue->result), &ret);
00500 if (ret != DRIZZLE_RETURN_OK)
00501 {
00502 drizzle_result_free(&(queue->result));
00503 GEARMAN_SERVER_ERROR_SET(server, "_libdrizzle_replay",
00504 "drizzle_row_buffer:%s",
00505 drizzle_error(&(queue->drizzle)))
00506 return GEARMAN_QUEUE_ERROR;
00507 }
00508
00509 if (row == NULL)
00510 break;
00511
00512 field_sizes= drizzle_row_field_sizes(&(queue->result));
00513
00514 GEARMAN_SERVER_DEBUG(server, "libdrizzle replay: %.*s",
00515 (uint32_t)field_sizes[0], row[1])
00516
00517 gret= (*add_fn)(server, add_context, row[0], field_sizes[0], row[1],
00518 field_sizes[1], row[3], field_sizes[3], atoi(row[2]));
00519 if (gret != GEARMAN_SUCCESS)
00520 {
00521 drizzle_row_free(&(queue->result), row);
00522 drizzle_result_free(&(queue->result));
00523 return gret;
00524 }
00525
00526 row[3]= NULL;
00527 drizzle_row_free(&(queue->result), row);
00528 }
00529
00530 drizzle_result_free(&(queue->result));
00531
00532 return GEARMAN_SUCCESS;
00533 }