00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libpq.h>
00017
00018 #if defined(HAVE_LIBPQ_FE_H)
00019 # include <libpq-fe.h>
00020 # include <pg_config_manual.h>
00021 #else
00022 # include <postgresql/libpq-fe.h>
00023 # include <postgresql/pg_config_manual.h>
00024 #endif
00025
00035 #define GEARMAN_QUEUE_LIBPQ_DEFAULT_TABLE "queue"
00036 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00037
00038
00039
00040
00041
00045 typedef struct
00046 {
00047 PGconn *con;
00048 char table[NAMEDATALEN];
00049 char *query;
00050 size_t query_size;
00051 } gearman_queue_libpq_st;
00052
00056 static void _libpq_notice_processor(void *arg, const char *message);
00057
00058
00059 static gearman_return_t _libpq_add(gearman_server_st *server, void *context,
00060 const void *unique, size_t unique_size,
00061 const void *function_name,
00062 size_t function_name_size,
00063 const void *data, size_t data_size,
00064 gearman_job_priority_t priority);
00065 static gearman_return_t _libpq_flush(gearman_server_st *server, void *context);
00066 static gearman_return_t _libpq_done(gearman_server_st *server, void *context,
00067 const void *unique,
00068 size_t unique_size,
00069 const void *function_name,
00070 size_t function_name_size);
00071 static gearman_return_t _libpq_replay(gearman_server_st *server, void *context,
00072 gearman_queue_add_fn *add_fn,
00073 void *add_context);
00074
00077
00078
00079
00080
00081 gearman_return_t gearman_server_queue_libpq_conf(gearman_conf_st *conf)
00082 {
00083 gearman_conf_module_st *module;
00084
00085 module= gearman_conf_module_create(conf, NULL, "libpq");
00086 if (module == NULL)
00087 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00088
00089 #define MCO(__name, __value, __help) \
00090 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00091
00092 MCO("conninfo", "STRING", "PostgreSQL connection information string.")
00093 MCO("table", "TABLE", "Table to use.")
00094
00095 return gearman_conf_return(conf);
00096 }
00097
00098 gearman_return_t gearman_server_queue_libpq_init(gearman_server_st *server,
00099 gearman_conf_st *conf)
00100 {
00101 gearman_queue_libpq_st *queue;
00102 gearman_conf_module_st *module;
00103 const char *name;
00104 const char *value;
00105 const char *conninfo= "";
00106 char create[1024];
00107 PGresult *result;
00108
00109 GEARMAN_SERVER_INFO(server, "Initializing libpq module")
00110
00111 queue= malloc(sizeof(gearman_queue_libpq_st));
00112 if (queue == NULL)
00113 {
00114 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libpq_init", "malloc")
00115 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00116 }
00117
00118 memset(queue, 0, sizeof(gearman_queue_libpq_st));
00119 snprintf(queue->table, NAMEDATALEN, GEARMAN_QUEUE_LIBPQ_DEFAULT_TABLE);
00120
00121 gearman_server_set_queue_context(server, queue);
00122
00123
00124 module= gearman_conf_module_find(conf, "libpq");
00125 if (module == NULL)
00126 {
00127 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libpq_init",
00128 "gearman_conf_module_find:NULL")
00129 return GEARMAN_QUEUE_ERROR;
00130 }
00131
00132 while (gearman_conf_module_value(module, &name, &value))
00133 {
00134 if (!strcmp(name, "conninfo"))
00135 conninfo= value;
00136 else if (!strcmp(name, "table"))
00137 snprintf(queue->table, NAMEDATALEN, "%s", value);
00138 else
00139 {
00140 gearman_server_queue_libpq_deinit(server);
00141 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libpq_init",
00142 "Unknown argument: %s", name)
00143 return GEARMAN_QUEUE_ERROR;
00144 }
00145 }
00146
00147 queue->con= PQconnectdb(conninfo);
00148
00149 if (queue->con == NULL || PQstatus(queue->con) != CONNECTION_OK)
00150 {
00151 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libpq_init",
00152 "PQconnectdb: %s", PQerrorMessage(queue->con))
00153 gearman_server_queue_libpq_deinit(server);
00154 return GEARMAN_QUEUE_ERROR;
00155 }
00156
00157 (void)PQsetNoticeProcessor(queue->con, _libpq_notice_processor, server);
00158
00159 snprintf(create, 1024, "SELECT tablename FROM pg_tables WHERE tablename='%s'",
00160 queue->table);
00161
00162 result= PQexec(queue->con, create);
00163 if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
00164 {
00165 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libpq_init", "PQexec:%s",
00166 PQerrorMessage(queue->con))
00167 PQclear(result);
00168 gearman_server_queue_libpq_deinit(server);
00169 return GEARMAN_QUEUE_ERROR;
00170 }
00171
00172 if (PQntuples(result) == 0)
00173 {
00174 PQclear(result);
00175
00176 snprintf(create, 1024,
00177 "CREATE TABLE %s"
00178 "("
00179 "unique_key VARCHAR(%d) PRIMARY KEY,"
00180 "function_name VARCHAR(255),"
00181 "priority INTEGER,"
00182 "data BYTEA"
00183 ")",
00184 queue->table, GEARMAN_UNIQUE_SIZE);
00185
00186 GEARMAN_SERVER_INFO(server, "libpq module creating table '%s'",
00187 queue->table)
00188
00189 result= PQexec(queue->con, create);
00190 if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00191 {
00192 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libpq_init", "PQexec:%s",
00193 PQerrorMessage(queue->con))
00194 PQclear(result);
00195 gearman_server_queue_libpq_deinit(server);
00196 return GEARMAN_QUEUE_ERROR;
00197 }
00198
00199 PQclear(result);
00200 }
00201 else
00202 PQclear(result);
00203
00204 gearman_server_set_queue_add_fn(server, _libpq_add);
00205 gearman_server_set_queue_flush_fn(server, _libpq_flush);
00206 gearman_server_set_queue_done_fn(server, _libpq_done);
00207 gearman_server_set_queue_replay_fn(server, _libpq_replay);
00208
00209 return GEARMAN_SUCCESS;
00210 }
00211
00212 gearman_return_t gearman_server_queue_libpq_deinit(gearman_server_st *server)
00213 {
00214 gearman_queue_libpq_st *queue;
00215
00216 GEARMAN_SERVER_INFO(server, "Shutting down libpq queue module")
00217
00218 queue= (gearman_queue_libpq_st *)gearman_server_queue_context(server);
00219 gearman_server_set_queue_context(server, NULL);
00220
00221 if (queue->con != NULL)
00222 PQfinish(queue->con);
00223
00224 if (queue->query != NULL)
00225 free(queue->query);
00226
00227 free(queue);
00228
00229 return GEARMAN_SUCCESS;
00230 }
00231
00232 gearman_return_t gearmand_queue_libpq_init(gearmand_st *gearmand,
00233 gearman_conf_st *conf)
00234 {
00235 return gearman_server_queue_libpq_init(&(gearmand->server), conf);
00236 }
00237
00238 gearman_return_t gearmand_queue_libpq_deinit(gearmand_st *gearmand)
00239 {
00240 return gearman_server_queue_libpq_deinit(&(gearmand->server));
00241 }
00242
00243
00244
00245
00246
00247 static void _libpq_notice_processor(void *arg, const char *message)
00248 {
00249 gearman_server_st *server= (gearman_server_st *)arg;
00250 GEARMAN_SERVER_INFO(server, "PostgreSQL %s", message)
00251 }
00252
00253 static gearman_return_t _libpq_add(gearman_server_st *server, void *context,
00254 const void *unique, size_t unique_size,
00255 const void *function_name,
00256 size_t function_name_size,
00257 const void *data, size_t data_size,
00258 gearman_job_priority_t priority)
00259 {
00260 gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00261 char *query;
00262 size_t query_size;
00263 PGresult *result;
00264
00265 const char *param_values[3]= { (char *)unique,
00266 (char *)function_name,
00267 (char *)data };
00268 int param_lengths[3]= { (int)unique_size,
00269 (int)function_name_size,
00270 (int)data_size };
00271 int param_formats[3]= { 0, 0, 1 };
00272
00273 GEARMAN_SERVER_DEBUG(server, "libpq add: %.*s", (uint32_t)unique_size,
00274 (char *)unique)
00275
00276
00277
00278 #if 0
00279 if (!not started)
00280 {
00281 if (_query(pq, "BEGIN", 5) != PQ_RETURN_OK)
00282 return REPQ_RETURN_EXTERNAL;
00283
00284 pq_result_free(&(pq->result));
00285 }
00286 #endif
00287
00288 query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00289 if (query_size > queue->query_size)
00290 {
00291 query= realloc(queue->query, query_size);
00292 if (query == NULL)
00293 {
00294 GEARMAN_SERVER_ERROR_SET(server, "_libpq_add", "realloc")
00295 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00296 }
00297
00298 queue->query= query;
00299 queue->query_size= query_size;
00300 }
00301 else
00302 query= queue->query;
00303
00304 (void)snprintf(query, query_size,
00305 "INSERT INTO %s (priority,unique_key,function_name,data) "
00306 "VALUES(%u,$1,$2,$3)", queue->table, (uint32_t)priority);
00307 result= PQexecParams(queue->con, query, 3, NULL, param_values, param_lengths,
00308 param_formats, 0);
00309 if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00310 {
00311 GEARMAN_SERVER_ERROR_SET(server, "_libpq_command", "PQexec:%s",
00312 PQerrorMessage(queue->con))
00313 PQclear(result);
00314 return GEARMAN_QUEUE_ERROR;
00315 }
00316
00317 PQclear(result);
00318
00319 return GEARMAN_SUCCESS;
00320 }
00321
00322 static gearman_return_t _libpq_flush(gearman_server_st *server,
00323 void *context __attribute__((unused)))
00324 {
00325 GEARMAN_SERVER_DEBUG(server, "libpq flush")
00326
00327 return GEARMAN_SUCCESS;
00328 }
00329
00330 static gearman_return_t _libpq_done(gearman_server_st *server, void *context,
00331 const void *unique,
00332 size_t unique_size,
00333 const void *function_name __attribute__((unused)),
00334 size_t function_name_size __attribute__((unused)))
00335 {
00336 gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00337 char *query;
00338 size_t query_size;
00339 PGresult *result;
00340
00341 GEARMAN_SERVER_DEBUG(server, "libpq done: %.*s", (uint32_t)unique_size,
00342 (char *)unique)
00343
00344 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00345 if (query_size > queue->query_size)
00346 {
00347 query= realloc(queue->query, query_size);
00348 if (query == NULL)
00349 {
00350 GEARMAN_SERVER_ERROR_SET(server, "_libpq_add", "realloc")
00351 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00352 }
00353
00354 queue->query= query;
00355 queue->query_size= query_size;
00356 }
00357 else
00358 query= queue->query;
00359
00360 query_size= (size_t)snprintf(query, query_size,
00361 "DELETE FROM %s WHERE unique_key='",
00362 queue->table);
00363
00364 query_size+= PQescapeStringConn(queue->con, query + query_size, unique,
00365 unique_size, NULL);
00366 memcpy(query + query_size, "'", 1);
00367 query_size+= 1;
00368 query[query_size]= 0;
00369
00370 result= PQexec(queue->con, query);
00371 if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
00372 {
00373 GEARMAN_SERVER_ERROR_SET(server, "_libpq_add", "PQexec:%s",
00374 PQerrorMessage(queue->con))
00375 PQclear(result);
00376 return GEARMAN_QUEUE_ERROR;
00377 }
00378
00379 PQclear(result);
00380
00381 return GEARMAN_SUCCESS;
00382 }
00383
00384 static gearman_return_t _libpq_replay(gearman_server_st *server, void *context,
00385 gearman_queue_add_fn *add_fn,
00386 void *add_context)
00387 {
00388 gearman_queue_libpq_st *queue= (gearman_queue_libpq_st *)context;
00389 char *query;
00390 gearman_return_t ret;
00391 PGresult *result;
00392 int row;
00393 void *data;
00394
00395 GEARMAN_SERVER_INFO(server, "libpq replay start")
00396
00397 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00398 {
00399 query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00400 if (query == NULL)
00401 {
00402 GEARMAN_SERVER_ERROR_SET(server, "_libpq_replay", "realloc")
00403 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00404 }
00405
00406 queue->query= query;
00407 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00408 }
00409 else
00410 query= queue->query;
00411
00412 (void)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00413 "SELECT unique_key,function_name,priority,data FROM %s",
00414 queue->table);
00415
00416 result= PQexecParams(queue->con, query, 0, NULL, NULL, NULL, NULL, 1);
00417 if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
00418 {
00419 GEARMAN_SERVER_ERROR_SET(server, "_libpq_replay", "PQexecParams:%s",
00420 PQerrorMessage(queue->con))
00421 PQclear(result);
00422 return GEARMAN_QUEUE_ERROR;
00423 }
00424
00425 for (row= 0; row < PQntuples(result); row++)
00426 {
00427 GEARMAN_SERVER_DEBUG(server, "libpq replay: %.*s",
00428 PQgetlength(result, row, 0),
00429 PQgetvalue(result, row, 0));
00430
00431 if (PQgetlength(result, row, 3) == 0)
00432 data= NULL;
00433 else
00434 {
00435 data= malloc((size_t)PQgetlength(result, row, 3));
00436 if (query == NULL)
00437 {
00438 PQclear(result);
00439 GEARMAN_SERVER_ERROR_SET(server, "_libpq_replay", "malloc")
00440 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00441 }
00442
00443 memcpy(data, PQgetvalue(result, row, 3),
00444 (size_t)PQgetlength(result, row, 3));
00445 }
00446
00447 ret= (*add_fn)(server, add_context, PQgetvalue(result, row, 0),
00448 (size_t)PQgetlength(result, row, 0),
00449 PQgetvalue(result, row, 1),
00450 (size_t)PQgetlength(result, row, 1),
00451 data, (size_t)PQgetlength(result, row, 3),
00452 atoi(PQgetvalue(result, row, 2)));
00453 if (ret != GEARMAN_SUCCESS)
00454 {
00455 PQclear(result);
00456 return ret;
00457 }
00458 }
00459
00460 PQclear(result);
00461
00462 return GEARMAN_SUCCESS;
00463 }