Gearman Developer Documentation

libgearman-server/queue_libdrizzle.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 #include <libgearman-server/queue_libdrizzle.h>
00017 #include <libdrizzle/drizzle_client.h>
00018 
00028 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE "test"
00029 #define GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE "queue"
00030 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00031 
00035 typedef struct
00036 {
00037   drizzle_st drizzle;
00038   drizzle_con_st con;
00039   drizzle_result_st result;
00040   char table[DRIZZLE_MAX_TABLE_SIZE];
00041   char *query;
00042   size_t query_size;
00043 } gearman_queue_libdrizzle_st;
00044 
00048 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00049                                           gearman_queue_libdrizzle_st *queue,
00050                                           const char *query, size_t query_size);
00051 
00052 /* Queue callback functions. */
00053 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00054                                         void *context, const void *unique,
00055                                         size_t unique_size,
00056                                         const void *function_name,
00057                                         size_t function_name_size,
00058                                         const void *data, size_t data_size,
00059                                         gearman_job_priority_t priority);
00060 static gearman_return_t _libdrizzle_flush(gearman_server_st *gearman,
00061                                           void *context);
00062 static gearman_return_t _libdrizzle_done(gearman_server_st *gearman,
00063                                           void *context, const void *unique,
00064                                          size_t unique_size,
00065                                          const void *function_name,
00066                                          size_t function_name_size);
00067 static gearman_return_t _libdrizzle_replay(gearman_server_st *gearman,
00068                                            void *context,
00069                                            gearman_queue_add_fn *add_fn,
00070                                            void *add_context);
00071 
00074 /*
00075  * Public definitions
00076  */
00077 
00078 gearman_return_t gearman_server_queue_libdrizzle_conf(gearman_conf_st *conf)
00079 {
00080   gearman_conf_module_st *module;
00081 
00082   module= gearman_conf_module_create(conf, NULL, "libdrizzle");
00083   if (module == NULL)
00084     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00085 
00086 #define MCO(__name, __value, __help) \
00087   gearman_conf_module_add_option(module, __name, 0, __value, __help);
00088 
00089   MCO("host", "HOST", "Host of server.")
00090   MCO("port", "PORT", "Port of server.")
00091   MCO("uds", "UDS", "Unix domain socket for server.")
00092   MCO("user", "USER", "User name for authentication.")
00093   MCO("password", "PASSWORD", "Password for authentication.")
00094   MCO("db", "DB", "Database to use.")
00095   MCO("table", "TABLE", "Table to use.")
00096   MCO("mysql", NULL, "Use MySQL protocol.")
00097 
00098   return gearman_conf_return(conf);
00099 }
00100 
00101 gearman_return_t gearman_server_queue_libdrizzle_init(gearman_server_st *server,
00102                                                       gearman_conf_st *conf)
00103 {
00104   gearman_queue_libdrizzle_st *queue;
00105   gearman_conf_module_st *module;
00106   const char *name;
00107   const char *value;
00108   const char *host= NULL;
00109   in_port_t port= 0;
00110   const char *uds= NULL;
00111   const char *user= NULL;
00112   const char *password= NULL;
00113   drizzle_row_t row;
00114   char create[1024];
00115 
00116   gearman_log_info(server->gearman, "Initializing libdrizzle module");
00117 
00118   queue= (gearman_queue_libdrizzle_st *)malloc(sizeof(gearman_queue_libdrizzle_st));
00119   if (queue == NULL)
00120   {
00121     gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "malloc");
00122     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00123   }
00124 
00125   memset(queue, 0, sizeof(gearman_queue_libdrizzle_st));
00126   snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE,
00127            GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_TABLE);
00128 
00129   if (drizzle_create(&(queue->drizzle)) == NULL)
00130   {
00131     free(queue);
00132     gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_create");
00133     return GEARMAN_QUEUE_ERROR;
00134   }
00135 
00136   if (drizzle_con_create(&(queue->drizzle), &(queue->con)) == NULL)
00137   {
00138     drizzle_free(&(queue->drizzle));
00139     free(queue);
00140     gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_con_create");
00141     return GEARMAN_QUEUE_ERROR;
00142   }
00143 
00144   gearman_server_set_queue_context(server, queue);
00145 
00146   drizzle_con_set_db(&(queue->con), GEARMAN_QUEUE_LIBDRIZZLE_DEFAULT_DATABASE);
00147 
00148   /* Get module and parse the option values that were given. */
00149   module= gearman_conf_module_find(conf, "libdrizzle");
00150   if (module == NULL)
00151   {
00152     gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "gearman_conf_module_find:NULL");
00153     return GEARMAN_QUEUE_ERROR;
00154   }
00155 
00156   while (gearman_conf_module_value(module, &name, &value))
00157   {
00158     if (!strcmp(name, "host"))
00159       host= value;
00160     else if (!strcmp(name, "port"))
00161       port= (in_port_t)atoi(value);
00162     else if (!strcmp(name, "uds"))
00163       uds= value;
00164     else if (!strcmp(name, "user"))
00165       user= value;
00166     else if (!strcmp(name, "password"))
00167       password= value;
00168     else if (!strcmp(name, "db"))
00169       drizzle_con_set_db(&(queue->con), value);
00170     else if (!strcmp(name, "table"))
00171       snprintf(queue->table, DRIZZLE_MAX_TABLE_SIZE, "%s", value);
00172     else if (!strcmp(name, "mysql"))
00173       drizzle_con_set_options(&(queue->con), DRIZZLE_CON_MYSQL);
00174     else
00175     {
00176       gearman_server_queue_libdrizzle_deinit(server);
00177       gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "Unknown argument: %s", name);
00178       return GEARMAN_QUEUE_ERROR;
00179     }
00180   }
00181 
00182   if (uds == NULL)
00183     drizzle_con_set_tcp(&(queue->con), host, port);
00184   else
00185     drizzle_con_set_uds(&(queue->con), uds);
00186 
00187   drizzle_con_set_auth(&(queue->con), user, password);
00188 
00189   /* Overwrite password string so it does not appear in 'ps' output. */
00190   if (password != NULL)
00191     memset((void *)password, 'x', strlen(password));
00192 
00193   if (_libdrizzle_query(server, queue, "SHOW TABLES", 11) != DRIZZLE_RETURN_OK)
00194   {
00195     gearman_server_queue_libdrizzle_deinit(server);
00196     return GEARMAN_QUEUE_ERROR;
00197   }
00198 
00199   if (drizzle_result_buffer(&(queue->result)) != DRIZZLE_RETURN_OK)
00200   {
00201     drizzle_result_free(&(queue->result));
00202     gearman_server_queue_libdrizzle_deinit(server);
00203     gearman_log_error(server->gearman, "gearman_queue_libdrizzle_init", "drizzle_result_buffer:%s", drizzle_error(&(queue->drizzle)));
00204     return GEARMAN_QUEUE_ERROR;
00205   }
00206 
00207   while ((row= drizzle_row_next(&(queue->result))) != NULL)
00208   {
00209     if (!strcasecmp(queue->table, row[0]))
00210     {
00211       gearman_log_info(server->gearman, "libdrizzle module using table '%s.%s'", drizzle_con_db(&queue->con), row[0]);
00212 
00213       break;
00214     }
00215   }
00216 
00217   drizzle_result_free(&(queue->result));
00218 
00219   if (row == NULL)
00220   {
00221     snprintf(create, 1024,
00222              "CREATE TABLE %s"
00223              "("
00224                "unique_key VARCHAR(%d) PRIMARY KEY,"
00225                "function_name VARCHAR(255),"
00226                "priority INT,"
00227                "data LONGBLOB"
00228              ")",
00229              queue->table, GEARMAN_UNIQUE_SIZE);
00230 
00231     gearman_log_info(server->gearman, "libdrizzle module creating table '%s.%s'",
00232                      drizzle_con_db(&queue->con), queue->table);
00233 
00234     if (_libdrizzle_query(server, queue, create, strlen(create))
00235         != DRIZZLE_RETURN_OK)
00236     {
00237       gearman_server_queue_libdrizzle_deinit(server);
00238       return GEARMAN_QUEUE_ERROR;
00239     }
00240 
00241     drizzle_result_free(&(queue->result));
00242   }
00243 
00244   gearman_server_set_queue_add_fn(server, _libdrizzle_add);
00245   gearman_server_set_queue_flush_fn(server, _libdrizzle_flush);
00246   gearman_server_set_queue_done_fn(server, _libdrizzle_done);
00247   gearman_server_set_queue_replay_fn(server, _libdrizzle_replay);
00248 
00249   return GEARMAN_SUCCESS;
00250 }
00251 
00252 gearman_return_t
00253 gearman_server_queue_libdrizzle_deinit(gearman_server_st *server)
00254 {
00255   gearman_queue_libdrizzle_st *queue;
00256 
00257   gearman_log_info(server->gearman, "Shutting down libdrizzle queue module");
00258 
00259   queue= (gearman_queue_libdrizzle_st *)gearman_server_queue_context(server);
00260   gearman_server_set_queue_context(server, NULL);
00261   drizzle_con_free(&(queue->con));
00262   drizzle_free(&(queue->drizzle));
00263   if (queue->query != NULL)
00264     free(queue->query);
00265   free(queue);
00266 
00267   return GEARMAN_SUCCESS;
00268 }
00269 
00270 gearman_return_t gearmand_queue_libdrizzle_init(gearmand_st *gearmand,
00271                                                 gearman_conf_st *conf)
00272 {
00273   return gearman_server_queue_libdrizzle_init(&(gearmand->server), conf);
00274 }
00275 
00276 gearman_return_t gearmand_queue_libdrizzle_deinit(gearmand_st *gearmand)
00277 {
00278   return gearman_server_queue_libdrizzle_deinit(&(gearmand->server));
00279 }
00280 
00281 /*
00282  * Static definitions
00283  */
00284 
00285 static drizzle_return_t _libdrizzle_query(gearman_server_st *server,
00286                                           gearman_queue_libdrizzle_st *queue,
00287                                           const char *query, size_t query_size)
00288 {
00289   drizzle_return_t ret;
00290 
00291   gearman_log_crazy(server->gearman, "libdrizzle query: %.*s", (uint32_t)query_size, query);
00292 
00293   (void)drizzle_query(&(queue->con), &(queue->result), query, query_size, &ret);
00294   if (ret != DRIZZLE_RETURN_OK)
00295   {
00296     /* If we lost the connection, try one more time before exiting. */
00297     if (ret == DRIZZLE_RETURN_LOST_CONNECTION)
00298     {
00299       (void)drizzle_query(&(queue->con), &(queue->result), query, query_size,
00300                           &ret);
00301     }
00302 
00303     if (ret != DRIZZLE_RETURN_OK)
00304     {
00305       gearman_log_error(server->gearman, "_libdrizzle_query", "drizzle_query:%s",
00306                                drizzle_error(&(queue->drizzle)));
00307       return ret;
00308     }
00309   }
00310 
00311   return DRIZZLE_RETURN_OK;
00312 }
00313 
00314 static gearman_return_t _libdrizzle_add(gearman_server_st *server,
00315                                         void *context, const void *unique,
00316                                         size_t unique_size,
00317                                         const void *function_name,
00318                                         size_t function_name_size,
00319                                         const void *data, size_t data_size,
00320                                         gearman_job_priority_t priority)
00321 {
00322   gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00323   char *query;
00324   size_t query_size;
00325 
00326   gearman_log_debug(server->gearman, "libdrizzle add: %.*s", (uint32_t)unique_size, (char *)unique);
00327 
00328   /* This is not used currently, it will be once batch writes are supported
00329      inside of the Gearman job server. */
00330 #if 0
00331   if (!not started)
00332   {
00333     if (_query(drizzle, "BEGIN", 5) != DRIZZLE_RETURN_OK)
00334       return REPQ_RETURN_EXTERNAL;
00335 
00336     drizzle_result_free(&(drizzle->result));
00337   }
00338 #endif
00339 
00340   query_size= ((unique_size + function_name_size + data_size) * 2) +
00341               GEARMAN_QUEUE_QUERY_BUFFER;
00342   if (query_size > queue->query_size)
00343   {
00344     query= (char *)realloc(queue->query, query_size);
00345     if (query == NULL)
00346     {
00347       gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00348       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00349     }
00350 
00351     queue->query= query;
00352     queue->query_size= query_size;
00353   }
00354   else
00355   {
00356     query= queue->query;
00357   }
00358 
00359   query_size= (size_t)snprintf(query, query_size,
00360                                "INSERT INTO %s SET priority=%u,unique_key='",
00361                                queue->table, (uint32_t)priority);
00362 
00363   query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00364                                              unique_size);
00365   memcpy(query + query_size, "',function_name='", 17);
00366   query_size+= 17;
00367 
00368   query_size+= (size_t)drizzle_escape_string(query + query_size, function_name,
00369                                              function_name_size);
00370   memcpy(query + query_size, "',data='", 8);
00371   query_size+= 8;
00372 
00373   query_size+= (size_t)drizzle_escape_string(query + query_size, data,
00374                                              data_size);
00375   memcpy(query + query_size, "'", 1);
00376   query_size+= 1;
00377 
00378   if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00379     return GEARMAN_QUEUE_ERROR;
00380 
00381   drizzle_result_free(&(queue->result));
00382 
00383   return GEARMAN_SUCCESS;
00384 }
00385 
00386 static gearman_return_t _libdrizzle_flush(gearman_server_st *server,
00387                                           void *context __attribute__((unused)))
00388 {
00389   gearman_log_debug(server->gearman, "libdrizzle flush");
00390 
00391   return GEARMAN_SUCCESS;
00392 }
00393 
00394 static gearman_return_t _libdrizzle_done(gearman_server_st *server,
00395                                          void *context, const void *unique,
00396                                          size_t unique_size,
00397                                          const void *function_name __attribute__((unused)),
00398                                          size_t function_name_size __attribute__((unused)))
00399 {
00400   gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00401   char *query;
00402   size_t query_size;
00403 
00404   gearman_log_debug(server->gearman, "libdrizzle done: %.*s", (uint32_t)unique_size, (char *)unique);
00405 
00406   query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00407   if (query_size > queue->query_size)
00408   {
00409     query= (char *)realloc(queue->query, query_size);
00410     if (query == NULL)
00411     {
00412       gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00413       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00414     }
00415 
00416     queue->query= query;
00417     queue->query_size= query_size;
00418   }
00419   else
00420     query= queue->query;
00421 
00422   query_size= (size_t)snprintf(query, query_size,
00423                                "DELETE FROM %s WHERE unique_key='",
00424                                queue->table);
00425 
00426   query_size+= (size_t)drizzle_escape_string(query + query_size, unique,
00427                                              unique_size);
00428   memcpy(query + query_size, "'", 1);
00429   query_size+= 1;
00430 
00431   if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00432     return GEARMAN_QUEUE_ERROR;
00433 
00434   drizzle_result_free(&(queue->result));
00435 
00436   return GEARMAN_SUCCESS;
00437 }
00438 
00439 static gearman_return_t _libdrizzle_replay(gearman_server_st *server,
00440                                            void *context,
00441                                            gearman_queue_add_fn *add_fn,
00442                                            void *add_context)
00443 {
00444   gearman_queue_libdrizzle_st *queue= (gearman_queue_libdrizzle_st *)context;
00445   char *query;
00446   size_t query_size;
00447   drizzle_return_t ret;
00448   drizzle_row_t row;
00449   size_t *field_sizes;
00450   gearman_return_t gret;
00451 
00452   gearman_log_info(server->gearman, "libdrizzle replay start");
00453 
00454   if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00455   {
00456     query= (char *)realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00457     if (query == NULL)
00458     {
00459       gearman_log_error(server->gearman, "_libdrizzle_add", "realloc");
00460       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00461     }
00462 
00463     queue->query= query;
00464     queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00465   }
00466   else
00467     query= queue->query;
00468 
00469   query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00470                                "SELECT unique_key,function_name,priority,data "
00471                                "FROM %s",
00472                                queue->table);
00473 
00474   if (_libdrizzle_query(server, queue, query, query_size) != DRIZZLE_RETURN_OK)
00475     return GEARMAN_QUEUE_ERROR;
00476 
00477   if (drizzle_column_skip(&(queue->result)) != DRIZZLE_RETURN_OK)
00478   {
00479     drizzle_result_free(&(queue->result));
00480     gearman_log_error(server->gearman, "_libdrizzle_replay", "drizzle_column_skip:%s", drizzle_error(&(queue->drizzle)));
00481 
00482     return GEARMAN_QUEUE_ERROR;
00483   }
00484 
00485   while (1)
00486   {
00487     row= drizzle_row_buffer(&(queue->result), &ret);
00488     if (ret != DRIZZLE_RETURN_OK)
00489     {
00490       drizzle_result_free(&(queue->result));
00491       gearman_log_error(server->gearman, "_libdrizzle_replay", "drizzle_row_buffer:%s", drizzle_error(&(queue->drizzle)));
00492 
00493       return GEARMAN_QUEUE_ERROR;
00494     }
00495 
00496     if (row == NULL)
00497       break;
00498 
00499     field_sizes= drizzle_row_field_sizes(&(queue->result));
00500 
00501     gearman_log_debug(server->gearman, "libdrizzle replay: %.*s", (uint32_t)field_sizes[0], row[1]);
00502 
00503     gret= (*add_fn)(server, add_context, row[0], field_sizes[0], row[1],
00504                     field_sizes[1], row[3], field_sizes[3], atoi(row[2]));
00505     if (gret != GEARMAN_SUCCESS)
00506     {
00507       drizzle_row_free(&(queue->result), row);
00508       drizzle_result_free(&(queue->result));
00509       return gret;
00510     }
00511 
00512     row[3]= NULL;
00513     drizzle_row_free(&(queue->result), row);
00514   }
00515 
00516   drizzle_result_free(&(queue->result));
00517 
00518   return GEARMAN_SUCCESS;
00519 }