Gearman Developer Documentation

libgearman-server/queue_libpq.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 #include <libgearman-server/queue_libpq.h>
00017 
00018 #if defined(HAVE_LIBPQ_FE_H)
00019 # include <libpq-fe.h>
00020 # include <pg_config_manual.h>
00021 #else
00022 # include <postgresql/libpq-fe.h>
00023 # include <postgresql/pg_config_manual.h>
00024 #endif
00025 
00035 #define GEARMAN_QUEUE_LIBPQ_DEFAULT_TABLE "queue"
00036 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00037 
00041 typedef struct
00042 {
00043   PGconn *con;
00044   char table[NAMEDATALEN];
00045   char *query;
00046   size_t query_size;
00047 } gearman_queue_libpq_st;
00048 
00052 static void _libpq_notice_processor(void *arg, const char *message);
00053 
00054 /* Queue callback functions. */
00055 static gearman_return_t _libpq_add(gearman_server_st *server, void *context,
00056                                    const void *unique, size_t unique_size,
00057                                    const void *function_name,
00058                                    size_t function_name_size,
00059                                    const void *data, size_t data_size,
00060                                    gearman_job_priority_t priority);
00061 static gearman_return_t _libpq_flush(gearman_server_st *server, void *context);
00062 static gearman_return_t _libpq_done(gearman_server_st *server, void *context,
00063                                     const void *unique,
00064                                     size_t unique_size,
00065                                     const void *function_name,
00066                                     size_t function_name_size);
00067 static gearman_return_t _libpq_replay(gearman_server_st *server, void *context,
00068                                       gearman_queue_add_fn *add_fn,
00069                                       void *add_context);
00070 
00073 /*
00074  * Public definitions
00075  */
00076 
00077 gearman_return_t gearman_server_queue_libpq_conf(gearman_conf_st *conf)
00078 {
00079   gearman_conf_module_st *module;
00080 
00081   module= gearman_conf_module_create(conf, NULL, "libpq");
00082   if (module == NULL)
00083     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00084 
00085 #define MCO(__name, __value, __help) \
00086   gearman_conf_module_add_option(module, __name, 0, __value, __help);
00087 
00088   MCO("conninfo", "STRING", "PostgreSQL connection information string.")
00089   MCO("table", "TABLE", "Table to use.")
00090 
00091   return gearman_conf_return(conf);
00092 }
00093 
00094 gearman_return_t gearman_server_queue_libpq_init(gearman_server_st *server,
00095                                                  gearman_conf_st *conf)
00096 {
00097   gearman_queue_libpq_st *queue;
00098   gearman_conf_module_st *module;
00099   const char *name;
00100   const char *value;
00101   const char *conninfo= "";
00102   char create[1024];
00103   PGresult *result;
00104 
00105   gearman_log_info(server->gearman, "Initializing libpq module");
00106 
00107   queue= (gearman_queue_libpq_st *)malloc(sizeof(gearman_queue_libpq_st));
00108   if (queue == NULL)
00109   {
00110     gearman_log_error(server->gearman, "gearman_queue_libpq_init", "malloc");
00111     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00112   }
00113 
00114   memset(queue, 0, sizeof(gearman_queue_libpq_st));
00115   snprintf(queue->table, NAMEDATALEN, GEARMAN_QUEUE_LIBPQ_DEFAULT_TABLE);
00116 
00117   gearman_server_set_queue_context(server, queue);
00118 
00119   /* Get module and parse the option values that were given. */
00120   module= gearman_conf_module_find(conf, "libpq");
00121   if (module == NULL)
00122   {
00123     gearman_log_error(server->gearman, "gearman_queue_libpq_init", "gearman_conf_module_find:NULL");
00124     return GEARMAN_QUEUE_ERROR;
00125   }
00126 
00127   while (gearman_conf_module_value(module, &name, &value))
00128   {
00129     if (!strcmp(name, "conninfo"))
00130       conninfo= value;
00131     else if (!strcmp(name, "table"))
00132       snprintf(queue->table, NAMEDATALEN, "%s", value);
00133     else
00134     {
00135       gearman_server_queue_libpq_deinit(server);
00136       gearman_log_error(server->gearman, "gearman_queue_libpq_init", "Unknown argument: %s", name);
00137       return GEARMAN_QUEUE_ERROR;
00138     }
00139   }
00140 
00141   queue->con= PQconnectdb(conninfo);
00142 
00143   if (queue->con == NULL || PQstatus(queue->con) != CONNECTION_OK)
00144   {
00145     gearman_log_error(server->gearman, "gearman_queue_libpq_init",
00146                              "PQconnectdb: %s", PQerrorMessage(queue->con));
00147     gearman_server_queue_libpq_deinit(server);
00148     return GEARMAN_QUEUE_ERROR;
00149   }
00150 
00151   (void)PQsetNoticeProcessor(queue->con, _libpq_notice_processor, server);
00152 
00153   snprintf(create, 1024, "SELECT tablename FROM pg_tables WHERE tablename='%s'",
00154            queue->table);
00155 
00156   result= PQexec(queue->con, create);
00157   if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
00158   {
00159     gearman_log_error(server->gearman, "gearman_queue_libpq_init", "PQexec:%s",
00160                              PQerrorMessage(queue->con));
00161     PQclear(result);
00162     gearman_server_queue_libpq_deinit(server);
00163     return GEARMAN_QUEUE_ERROR;
00164   }
00165 
00166   if (PQntuples(result) == 0)
00167   {
00168     PQclear(result);
00169 
00170     snprintf(create, 1024,
00171              "CREATE TABLE %s"
00172              "("
00173                "unique_key VARCHAR(%d) PRIMARY KEY,"
00174                "function_name VARCHAR(255),"
00175                "priority INTEGER,"
00176                "data BYTEA"
00177              ")",
00178              queue->table, GEARMAN_UNIQUE_SIZE);
00179 
00180     gearman_log_info(server->gearman, "libpq module creating table '%s'", queue->table);
00181 
00182     result= PQexec(queue->con, create);
00183     if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00184     {
00185       gearman_log_error(server->gearman, "gearman_queue_libpq_init", "PQexec:%s",
00186                                PQerrorMessage(queue->con));
00187       PQclear(result);
00188       gearman_server_queue_libpq_deinit(server);
00189       return GEARMAN_QUEUE_ERROR;
00190     }
00191 
00192     PQclear(result);
00193   }
00194   else
00195     PQclear(result);
00196 
00197   gearman_server_set_queue_add_fn(server, _libpq_add);
00198   gearman_server_set_queue_flush_fn(server, _libpq_flush);
00199   gearman_server_set_queue_done_fn(server, _libpq_done);
00200   gearman_server_set_queue_replay_fn(server, _libpq_replay);
00201 
00202   return GEARMAN_SUCCESS;
00203 }
00204 
00205 gearman_return_t gearman_server_queue_libpq_deinit(gearman_server_st *server)
00206 {
00207   gearman_queue_libpq_st *queue;
00208 
00209   gearman_log_info(server->gearman, "Shutting down libpq queue module");
00210 
00211   queue= (gearman_queue_libpq_st *)gearman_server_queue_context(server);
00212   gearman_server_set_queue_context(server, NULL);
00213 
00214   if (queue->con != NULL)
00215     PQfinish(queue->con);
00216 
00217   if (queue->query != NULL)
00218     free(queue->query);
00219 
00220   free(queue);
00221 
00222   return GEARMAN_SUCCESS;
00223 }
00224 
00225 gearman_return_t gearmand_queue_libpq_init(gearmand_st *gearmand,
00226                                                 gearman_conf_st *conf)
00227 {
00228   return gearman_server_queue_libpq_init(&(gearmand->server), conf);
00229 }
00230 
00231 gearman_return_t gearmand_queue_libpq_deinit(gearmand_st *gearmand)
00232 {
00233   return gearman_server_queue_libpq_deinit(&(gearmand->server));
00234 }
00235 
00236 /*
00237  * Static definitions
00238  */
00239 
00240 static void _libpq_notice_processor(void *arg, const char *message)
00241 {
00242   gearman_server_st *server= (gearman_server_st *)arg;
00243   gearman_log_info(server->gearman, "PostgreSQL %s", message);
00244 }
00245 
00246 static gearman_return_t _libpq_add(gearman_server_st *server, void *context,
00247                                         const void *unique, size_t unique_size,
00248                                         const void *function_name,
00249                                         size_t function_name_size,
00250                                         const void *data, size_t data_size,
00251                                         gearman_job_priority_t priority)
00252 {
00253   gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00254   char *query;
00255   size_t query_size;
00256   PGresult *result;
00257 
00258   const char *param_values[3]= { (char *)unique,
00259                                  (char *)function_name,
00260                                  (char *)data };
00261   int param_lengths[3]= { (int)unique_size,
00262                           (int)function_name_size,
00263                           (int)data_size };
00264   int param_formats[3]= { 0, 0, 1 };
00265 
00266   gearman_log_debug(server->gearman, "libpq add: %.*s", (uint32_t)unique_size, (char *)unique);
00267 
00268   /* This is not used currently, it will be once batch writes are supported
00269      inside of the Gearman job server. */
00270 #if 0
00271   if (!not started)
00272   {
00273     if (_query(pq, "BEGIN", 5) != PQ_RETURN_OK)
00274       return REPQ_RETURN_EXTERNAL;
00275 
00276     pq_result_free(&(pq->result));
00277   }
00278 #endif
00279 
00280   query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00281   if (query_size > queue->query_size)
00282   {
00283     query= (char *)realloc(queue->query, query_size);
00284     if (query == NULL)
00285     {
00286       gearman_log_error(server->gearman, "_libpq_add", "realloc");
00287       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00288     }
00289 
00290     queue->query= query;
00291     queue->query_size= query_size;
00292   }
00293   else
00294     query= queue->query;
00295 
00296   (void)snprintf(query, query_size,
00297                  "INSERT INTO %s (priority,unique_key,function_name,data) "
00298                  "VALUES(%u,$1,$2,$3)", queue->table, (uint32_t)priority);
00299   result= PQexecParams(queue->con, query, 3, NULL, param_values, param_lengths,
00300                        param_formats, 0);
00301   if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00302   {
00303     gearman_log_error(server->gearman, "_libpq_command", "PQexec:%s", PQerrorMessage(queue->con));
00304     PQclear(result);
00305     return GEARMAN_QUEUE_ERROR;
00306   }
00307 
00308   PQclear(result);
00309 
00310   return GEARMAN_SUCCESS;
00311 }
00312 
00313 static gearman_return_t _libpq_flush(gearman_server_st *server,
00314                                      void *context __attribute__((unused)))
00315 {
00316   gearman_log_debug(server->gearman, "libpq flush");
00317 
00318   return GEARMAN_SUCCESS;
00319 }
00320 
00321 static gearman_return_t _libpq_done(gearman_server_st *server, void *context,
00322                                     const void *unique,
00323                                     size_t unique_size,
00324                                     const void *function_name __attribute__((unused)),
00325                                     size_t function_name_size __attribute__((unused)))
00326 {
00327   gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00328   char *query;
00329   size_t query_size;
00330   PGresult *result;
00331 
00332   gearman_log_debug(server->gearman, "libpq done: %.*s", (uint32_t)unique_size, (char *)unique);
00333 
00334   query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00335   if (query_size > queue->query_size)
00336   {
00337     query= (char *)realloc(queue->query, query_size);
00338     if (query == NULL)
00339     {
00340       gearman_log_error(server->gearman, "_libpq_add", "realloc");
00341       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00342     }
00343 
00344     queue->query= query;
00345     queue->query_size= query_size;
00346   }
00347   else
00348     query= queue->query;
00349 
00350   query_size= (size_t)snprintf(query, query_size,
00351                                "DELETE FROM %s WHERE unique_key='",
00352                                queue->table);
00353 
00354   query_size+= PQescapeStringConn(queue->con, query + query_size, unique,
00355                                   unique_size, NULL);
00356   memcpy(query + query_size, "'", 1);
00357   query_size+= 1;
00358   query[query_size]= 0;
00359 
00360   result= PQexec(queue->con, query);
00361   if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00362   {
00363     gearman_log_error(server->gearman, "_libpq_add", "PQexec:%s", PQerrorMessage(queue->con));
00364     PQclear(result);
00365     return GEARMAN_QUEUE_ERROR;
00366   }
00367 
00368   PQclear(result);
00369 
00370   return GEARMAN_SUCCESS;
00371 }
00372 
00373 static gearman_return_t _libpq_replay(gearman_server_st *server, void *context,
00374                                       gearman_queue_add_fn *add_fn,
00375                                       void *add_context)
00376 {
00377   gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00378   char *query;
00379   gearman_return_t ret;
00380   PGresult *result;
00381   int row;
00382   void *data;
00383 
00384   gearman_log_info(server->gearman, "libpq replay start");
00385 
00386   if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00387   {
00388     query= (char *)realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00389     if (query == NULL)
00390     {
00391       gearman_log_error(server->gearman, "_libpq_replay", "realloc");
00392       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00393     }
00394 
00395     queue->query= query;
00396     queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00397   }
00398   else
00399     query= queue->query;
00400 
00401   (void)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00402                  "SELECT unique_key,function_name,priority,data FROM %s",
00403                  queue->table);
00404 
00405   result= PQexecParams(queue->con, query, 0, NULL, NULL, NULL, NULL, 1);
00406   if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
00407   {
00408     gearman_log_error(server->gearman, "_libpq_replay", "PQexecParams:%s", PQerrorMessage(queue->con));
00409     PQclear(result);
00410     return GEARMAN_QUEUE_ERROR;
00411   }
00412 
00413   for (row= 0; row < PQntuples(result); row++)
00414   {
00415     gearman_log_debug(server->gearman, "libpq replay: %.*s",
00416                       PQgetlength(result, row, 0),
00417                       PQgetvalue(result, row, 0));
00418 
00419     if (PQgetlength(result, row, 3) == 0)
00420     {
00421       data= NULL;
00422     }
00423     else
00424     {
00425       data= (void *)malloc((size_t)PQgetlength(result, row, 3));
00426       if (query == NULL)
00427       {
00428         PQclear(result);
00429         gearman_log_error(server->gearman, "_libpq_replay", "malloc");
00430         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00431       }
00432 
00433       memcpy(data, PQgetvalue(result, row, 3),
00434              (size_t)PQgetlength(result, row, 3));
00435     }
00436 
00437     ret= (*add_fn)(server, add_context, PQgetvalue(result, row, 0),
00438                    (size_t)PQgetlength(result, row, 0),
00439                    PQgetvalue(result, row, 1),
00440                    (size_t)PQgetlength(result, row, 1),
00441                    data, (size_t)PQgetlength(result, row, 3),
00442                    atoi(PQgetvalue(result, row, 2)));
00443     if (ret != GEARMAN_SUCCESS)
00444     {
00445       PQclear(result);
00446       return ret;
00447     }
00448   }
00449 
00450   PQclear(result);
00451 
00452   return GEARMAN_SUCCESS;
00453 }