00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016 #include <libgearman-server/queue_libsqlite3.h>
00017 #include <sqlite3.h>
00018
00028 #define GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE "gearman_queue"
00029 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00030
00031
00032
00033
00034 #define SQLITE_MAX_TABLE_SIZE 256
00035 #define SQLITE_MAX_CREATE_TABLE_SIZE 1024
00036
00040 typedef struct
00041 {
00042 sqlite3* db;
00043 char table[SQLITE_MAX_TABLE_SIZE];
00044 char *query;
00045 size_t query_size;
00046 int in_trans;
00047 } gearman_queue_sqlite_st;
00048
00052 static int _sqlite_query(gearman_server_st *server,
00053 gearman_queue_sqlite_st *queue,
00054 const char *query, size_t query_size,
00055 sqlite3_stmt ** sth);
00056 static int _sqlite_lock(gearman_server_st *server,
00057 gearman_queue_sqlite_st *queue);
00058 static int _sqlite_commit(gearman_server_st *server,
00059 gearman_queue_sqlite_st *queue);
00060 static int _sqlite_rollback(gearman_server_st *server,
00061 gearman_queue_sqlite_st *queue);
00062
00063
00064 static gearman_return_t _sqlite_add(gearman_server_st *server, void *context,
00065 const void *unique, size_t unique_size,
00066 const void *function_name,
00067 size_t function_name_size,
00068 const void *data, size_t data_size,
00069 gearman_job_priority_t priority);
00070 static gearman_return_t _sqlite_flush(gearman_server_st *server, void *context);
00071 static gearman_return_t _sqlite_done(gearman_server_st *server, void *context,
00072 const void *unique,
00073 size_t unique_size,
00074 const void *function_name,
00075 size_t function_name_size);
00076 static gearman_return_t _sqlite_replay(gearman_server_st *server, void *context,
00077 gearman_queue_add_fn *add_fn,
00078 void *add_context);
00079
00082
00083
00084
00085
00086 gearman_return_t gearman_server_queue_libsqlite3_conf(gearman_conf_st *conf)
00087 {
00088 gearman_conf_module_st *module;
00089
00090 module= gearman_conf_module_create(conf, NULL, "libsqlite3");
00091 if (module == NULL)
00092 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00093
00094 #define MCO(__name, __value, __help) \
00095 gearman_conf_module_add_option(module, __name, 0, __value, __help);
00096
00097 MCO("db", "DB", "Database file to use.")
00098 MCO("table", "TABLE", "Table to use.")
00099
00100 return gearman_conf_return(conf);
00101 }
00102
00103 gearman_return_t gearman_server_queue_libsqlite3_init(gearman_server_st *server,
00104 gearman_conf_st *conf)
00105 {
00106 gearman_queue_sqlite_st *queue;
00107 gearman_conf_module_st *module;
00108 const char *name;
00109 const char *value;
00110 char *table= NULL;
00111 const char *query;
00112 sqlite3_stmt* sth;
00113 char create[SQLITE_MAX_CREATE_TABLE_SIZE];
00114
00115 GEARMAN_SERVER_INFO(server, "Initializing libsqlite3 module");
00116
00117 queue= malloc(sizeof(gearman_queue_sqlite_st));
00118 if (queue == NULL)
00119 {
00120 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init", "malloc")
00121 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00122 }
00123
00124 memset(queue, 0, sizeof(gearman_queue_sqlite_st));
00125 snprintf(queue->table, SQLITE_MAX_TABLE_SIZE,
00126 GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE);
00127
00128
00129 module= gearman_conf_module_find(conf, "libsqlite3");
00130 if (module == NULL)
00131 {
00132 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00133 "gearman_conf_module_find:NULL");
00134 free(queue);
00135 return GEARMAN_QUEUE_ERROR;
00136 }
00137
00138 gearman_server_set_queue_context(server, queue);
00139
00140 while (gearman_conf_module_value(module, &name, &value))
00141 {
00142 if (!strcmp(name, "db"))
00143 {
00144 if (sqlite3_open(value, &(queue->db)) != SQLITE_OK)
00145 {
00146 gearman_server_queue_libsqlite3_deinit(server);
00147 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00148 "Can't open database: %s\n",
00149 sqlite3_errmsg(queue->db));
00150 free(queue);
00151 return GEARMAN_QUEUE_ERROR;
00152 }
00153 }
00154 else if (!strcmp(name, "table"))
00155 snprintf(queue->table, SQLITE_MAX_TABLE_SIZE, "%s", value);
00156 else
00157 {
00158 gearman_server_queue_libsqlite3_deinit(server);
00159 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00160 "Unknown argument: %s", name);
00161 return GEARMAN_QUEUE_ERROR;
00162 }
00163 }
00164
00165 if (!queue->db)
00166 {
00167 gearman_server_queue_libsqlite3_deinit(server);
00168 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00169 "missing required --libsqlite3-db=<dbfile> argument");
00170 return GEARMAN_QUEUE_ERROR;
00171 }
00172
00173 query= "SELECT name FROM sqlite_master WHERE type='table'";
00174 if (_sqlite_query(server, queue, query, strlen(query), &sth) != SQLITE_OK)
00175 {
00176 gearman_server_queue_libsqlite3_deinit(server);
00177 return GEARMAN_QUEUE_ERROR;
00178 }
00179
00180 while (sqlite3_step(sth) == SQLITE_ROW)
00181 {
00182 if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
00183 table= (char*)sqlite3_column_text(sth, 0);
00184 else
00185 {
00186 sqlite3_finalize(sth);
00187 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00188 "column %d is not type TEXT", 0);
00189 return GEARMAN_QUEUE_ERROR;
00190 }
00191
00192 if (!strcasecmp(queue->table, table))
00193 {
00194 GEARMAN_SERVER_INFO(server, "sqlite module using table '%s'", table);
00195 break;
00196 }
00197 }
00198
00199 if (sqlite3_finalize(sth) != SQLITE_OK)
00200 {
00201 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00202 "sqlite_finalize:%s", sqlite3_errmsg(queue->db));
00203 gearman_server_queue_libsqlite3_deinit(server);
00204 return GEARMAN_QUEUE_ERROR;
00205 }
00206
00207 if (table == NULL)
00208 {
00209 snprintf(create, SQLITE_MAX_CREATE_TABLE_SIZE,
00210 "CREATE TABLE %s"
00211 "("
00212 "unique_key TEXT PRIMARY KEY,"
00213 "function_name TEXT,"
00214 "priority INTEGER,"
00215 "data BLOB"
00216 ")",
00217 queue->table);
00218
00219 GEARMAN_SERVER_INFO(server, "sqlite module creating table '%s'", queue->table);
00220
00221 if (_sqlite_query(server, queue, create, strlen(create), &sth)
00222 != SQLITE_OK)
00223 {
00224 gearman_server_queue_libsqlite3_deinit(server);
00225 return GEARMAN_QUEUE_ERROR;
00226 }
00227
00228 if (sqlite3_step(sth) != SQLITE_DONE)
00229 {
00230 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00231 "create table error: %s",
00232 sqlite3_errmsg(queue->db));
00233 sqlite3_finalize(sth);
00234 return GEARMAN_QUEUE_ERROR;
00235 }
00236
00237 if (sqlite3_finalize(sth) != SQLITE_OK)
00238 {
00239 GEARMAN_SERVER_ERROR_SET(server, "gearman_queue_libsqlite3_init",
00240 "sqlite_finalize:%s",
00241 sqlite3_errmsg(queue->db));
00242 gearman_server_queue_libsqlite3_deinit(server);
00243 return GEARMAN_QUEUE_ERROR;
00244 }
00245 }
00246
00247 gearman_server_set_queue_add_fn(server, _sqlite_add);
00248 gearman_server_set_queue_flush_fn(server, _sqlite_flush);
00249 gearman_server_set_queue_done_fn(server, _sqlite_done);
00250 gearman_server_set_queue_replay_fn(server, _sqlite_replay);
00251
00252 return GEARMAN_SUCCESS;
00253 }
00254
00255 gearman_return_t
00256 gearman_server_queue_libsqlite3_deinit(gearman_server_st *server)
00257 {
00258 gearman_queue_sqlite_st *queue;
00259
00260 GEARMAN_SERVER_INFO(server, "Shutting down sqlite queue module");
00261
00262 queue= (gearman_queue_sqlite_st *)gearman_server_queue_context(server);
00263 gearman_server_set_queue_context(server, NULL);
00264 sqlite3_close(queue->db);
00265 if (queue->query != NULL)
00266 free(queue->query);
00267 free(queue);
00268
00269 return GEARMAN_SUCCESS;
00270 }
00271
00272 gearman_return_t gearmand_queue_libsqlite3_init(gearmand_st *gearmand,
00273 gearman_conf_st *conf)
00274 {
00275 return gearman_server_queue_libsqlite3_init(&(gearmand->server), conf);
00276 }
00277
00278 gearman_return_t gearmand_queue_libsqlite3_deinit(gearmand_st *gearmand)
00279 {
00280 return gearman_server_queue_libsqlite3_deinit(&(gearmand->server));
00281 }
00282
00283
00284
00285
00286
00287 int _sqlite_query(gearman_server_st *server,
00288 gearman_queue_sqlite_st *queue,
00289 const char *query, size_t query_size,
00290 sqlite3_stmt ** sth)
00291 {
00292 int ret;
00293
00294 if (query_size > UINT32_MAX)
00295 {
00296 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_query", "query size too big [%u]",
00297 (uint32_t)query_size);
00298 return SQLITE_ERROR;
00299 }
00300
00301 GEARMAN_SERVER_CRAZY(server, "sqlite query: %s", query);
00302 ret= sqlite3_prepare(queue->db, query, (int)query_size, sth, NULL);
00303 if (ret != SQLITE_OK)
00304 {
00305 if (*sth)
00306 sqlite3_finalize(*sth);
00307 *sth= NULL;
00308 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_query", "sqlite_prepare:%s",
00309 sqlite3_errmsg(queue->db));
00310 }
00311
00312 return ret;
00313 }
00314
00315 int _sqlite_lock(gearman_server_st *server,
00316 gearman_queue_sqlite_st *queue)
00317 {
00318 sqlite3_stmt* sth;
00319 int ret;
00320 if (queue->in_trans)
00321 {
00322
00323 return SQLITE_OK;
00324 }
00325
00326 ret= _sqlite_query(server, queue, "BEGIN TRANSACTION",
00327 sizeof("BEGIN TRANSACTION") - 1, &sth);
00328 if (ret != SQLITE_OK)
00329 {
00330 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_lock",
00331 "failed to begin transaction: %s",
00332 sqlite3_errmsg(queue->db));
00333 if(sth)
00334 sqlite3_finalize(sth);
00335
00336 return ret;
00337 }
00338
00339 ret= sqlite3_step(sth);
00340 if (ret != SQLITE_DONE)
00341 {
00342 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_lock", "lock error: %s",
00343 sqlite3_errmsg(queue->db));
00344 sqlite3_finalize(sth);
00345 return ret;
00346 }
00347
00348 sqlite3_finalize(sth);
00349 queue->in_trans++;
00350
00351 return SQLITE_OK;
00352 }
00353
00354 int _sqlite_commit(gearman_server_st *server,
00355 gearman_queue_sqlite_st *queue)
00356 {
00357 sqlite3_stmt* sth;
00358 int ret;
00359
00360 if (! queue->in_trans)
00361 {
00362
00363 return SQLITE_OK;
00364 }
00365
00366 ret= _sqlite_query(server, queue, "COMMIT", sizeof("COMMIT") - 1, &sth);
00367 if (ret != SQLITE_OK)
00368 {
00369 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_commit",
00370 "failed to commit transaction: %s",
00371 sqlite3_errmsg(queue->db));
00372 if(sth)
00373 sqlite3_finalize(sth);
00374 return ret;
00375 }
00376 ret= sqlite3_step(sth);
00377 if (ret != SQLITE_DONE)
00378 {
00379 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_commit", "commit error: %s",
00380 sqlite3_errmsg(queue->db));
00381 sqlite3_finalize(sth);
00382 return ret;
00383 }
00384 sqlite3_finalize(sth);
00385 queue->in_trans= 0;
00386 return SQLITE_OK;
00387 }
00388
00389 int _sqlite_rollback(gearman_server_st *server,
00390 gearman_queue_sqlite_st *queue)
00391 {
00392 sqlite3_stmt* sth;
00393 int ret;
00394 const char* query;
00395
00396 if (! queue->in_trans)
00397 {
00398
00399 return SQLITE_OK;
00400 }
00401
00402 query= "ROLLBACK";
00403 ret= _sqlite_query(server, queue, query, strlen(query), &sth);
00404 if (ret != SQLITE_OK)
00405 {
00406 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_rollback",
00407 "failed to rollback transaction: %s",
00408 sqlite3_errmsg(queue->db));
00409 if(sth)
00410 sqlite3_finalize(sth);
00411 return ret;
00412 }
00413 ret= sqlite3_step(sth);
00414 if (ret != SQLITE_DONE)
00415 {
00416 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_rollback", "rollback error: %s",
00417 sqlite3_errmsg(queue->db));
00418 sqlite3_finalize(sth);
00419 return ret;
00420 }
00421 sqlite3_finalize(sth);
00422 queue->in_trans= 0;
00423 return SQLITE_OK;
00424 }
00425
00426 static gearman_return_t _sqlite_add(gearman_server_st *server, void *context,
00427 const void *unique, size_t unique_size,
00428 const void *function_name,
00429 size_t function_name_size,
00430 const void *data, size_t data_size,
00431 gearman_job_priority_t priority)
00432 {
00433 gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00434 char *query;
00435 size_t query_size;
00436 sqlite3_stmt* sth;
00437
00438 if (unique_size > UINT32_MAX || function_name_size > UINT32_MAX ||
00439 data_size > UINT32_MAX)
00440 {
00441 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add", "size too big [%u]",
00442 (uint32_t)unique_size);
00443 return SQLITE_ERROR;
00444 }
00445
00446 GEARMAN_SERVER_DEBUG(server, "sqlite add: %.*s", (uint32_t)unique_size,
00447 (char *)unique);
00448
00449 if (_sqlite_lock(server, queue) != SQLITE_OK)
00450 return GEARMAN_QUEUE_ERROR;
00451
00452 query_size= ((unique_size + function_name_size + data_size) * 2) +
00453 GEARMAN_QUEUE_QUERY_BUFFER;
00454 if (query_size > queue->query_size)
00455 {
00456 query= realloc(queue->query, query_size);
00457 if (query == NULL)
00458 {
00459 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add", "realloc")
00460 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00461 }
00462
00463 queue->query= query;
00464 queue->query_size= query_size;
00465 }
00466 else
00467 query= queue->query;
00468
00469 query_size= (size_t)snprintf(query, query_size,
00470 "INSERT INTO %s (priority,unique_key,"
00471 "function_name,data) VALUES (?,?,?,?)",
00472 queue->table);
00473
00474 if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00475 return GEARMAN_QUEUE_ERROR;
00476
00477 if (sqlite3_bind_int(sth, 1, priority) != SQLITE_OK)
00478 {
00479 _sqlite_rollback(server, queue);
00480 GEARMAN_SERVER_ERROR_SET(server,
00481 "_sqlite_add", "failed to bind int [%d]: %s",
00482 priority, sqlite3_errmsg(queue->db));
00483 sqlite3_finalize(sth);
00484 return GEARMAN_QUEUE_ERROR;
00485 }
00486
00487 if (sqlite3_bind_text(sth, 2, unique, (int)unique_size,
00488 SQLITE_TRANSIENT) != SQLITE_OK)
00489 {
00490 _sqlite_rollback(server, queue);
00491 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add",
00492 "failed to bind text [%.*s]: %s",
00493 (uint32_t)unique_size, (char*)unique,
00494 sqlite3_errmsg(queue->db));
00495 sqlite3_finalize(sth);
00496 return GEARMAN_QUEUE_ERROR;
00497 }
00498
00499 if (sqlite3_bind_text(sth, 3, function_name, (int)function_name_size,
00500 SQLITE_TRANSIENT) != SQLITE_OK)
00501 {
00502 _sqlite_rollback(server, queue);
00503 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add",
00504 "failed to bind text [%.*s]: %s",
00505 (uint32_t)function_name_size, (char*)function_name,
00506 sqlite3_errmsg(queue->db));
00507 sqlite3_finalize(sth);
00508 return GEARMAN_QUEUE_ERROR;
00509 }
00510
00511 if (sqlite3_bind_blob(sth, 4, data, (int)data_size,
00512 SQLITE_TRANSIENT) != SQLITE_OK)
00513 {
00514 _sqlite_rollback(server, queue);
00515 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add", "failed to bind blob: %s",
00516 sqlite3_errmsg(queue->db));
00517 sqlite3_finalize(sth);
00518 return GEARMAN_QUEUE_ERROR;
00519 }
00520
00521 if (sqlite3_step(sth) != SQLITE_DONE)
00522 {
00523 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add", "insert error: %s",
00524 sqlite3_errmsg(queue->db));
00525 if (sqlite3_finalize(sth) != SQLITE_OK )
00526 {
00527 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add", "finalize error: %s",
00528 sqlite3_errmsg(queue->db));
00529 }
00530
00531 return GEARMAN_QUEUE_ERROR;
00532 }
00533
00534 sqlite3_finalize(sth);
00535
00536 if (_sqlite_commit(server, queue) != SQLITE_OK)
00537 return GEARMAN_QUEUE_ERROR;
00538
00539 return GEARMAN_SUCCESS;
00540 }
00541
00542 static gearman_return_t _sqlite_flush(gearman_server_st *server,
00543 void *context __attribute__((unused)))
00544 {
00545 GEARMAN_SERVER_DEBUG(server, "sqlite flush");
00546
00547 return GEARMAN_SUCCESS;
00548 }
00549
00550 static gearman_return_t _sqlite_done(gearman_server_st *server, void *context,
00551 const void *unique,
00552 size_t unique_size,
00553 const void *function_name __attribute__((unused)),
00554 size_t function_name_size __attribute__((unused)))
00555 {
00556 gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00557 char *query;
00558 size_t query_size;
00559 sqlite3_stmt* sth;
00560
00561 if (unique_size > UINT32_MAX)
00562 {
00563 GEARMAN_SERVER_ERROR_SET(server,
00564 "_sqlite_query", "unique key size too big [%u]",
00565 (uint32_t)unique_size);
00566 return SQLITE_ERROR;
00567 }
00568
00569 GEARMAN_SERVER_DEBUG(server, "sqlite done: %.*s", (uint32_t)unique_size,
00570 (char *)unique);
00571
00572 if (_sqlite_lock(server, queue) != SQLITE_OK)
00573 return GEARMAN_QUEUE_ERROR;
00574
00575 query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00576 if (query_size > queue->query_size)
00577 {
00578 query= realloc(queue->query, query_size);
00579 if (query == NULL)
00580 {
00581 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_add", "realloc")
00582 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00583 }
00584
00585 queue->query= query;
00586 queue->query_size= query_size;
00587 }
00588 else
00589 query= queue->query;
00590
00591 query_size= (size_t)snprintf(query, query_size,
00592 "DELETE FROM %s WHERE unique_key=?",
00593 queue->table);
00594
00595 if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00596 return GEARMAN_QUEUE_ERROR;
00597
00598 sqlite3_bind_text(sth, 1, unique, (int)unique_size, SQLITE_TRANSIENT);
00599
00600 if (sqlite3_step(sth) != SQLITE_DONE)
00601 {
00602 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_done", "delete error: %s",
00603 sqlite3_errmsg(queue->db));
00604 sqlite3_finalize(sth);
00605 return GEARMAN_QUEUE_ERROR;
00606 }
00607
00608 sqlite3_finalize(sth);
00609
00610 if (_sqlite_commit(server, queue) != SQLITE_OK)
00611 return GEARMAN_QUEUE_ERROR;
00612
00613 return GEARMAN_SUCCESS;
00614 }
00615
00616 static gearman_return_t _sqlite_replay(gearman_server_st *server, void *context,
00617 gearman_queue_add_fn *add_fn,
00618 void *add_context)
00619 {
00620 gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00621 char *query;
00622 size_t query_size;
00623 sqlite3_stmt* sth;
00624 gearman_return_t gret;
00625
00626 GEARMAN_SERVER_INFO(server, "sqlite replay start")
00627
00628 if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00629 {
00630 query= realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00631 if (query == NULL)
00632 {
00633 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_replay", "realloc")
00634 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00635 }
00636
00637 queue->query= query;
00638 queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00639 }
00640 else
00641 query= queue->query;
00642
00643 query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00644 "SELECT unique_key,function_name,priority,data "
00645 "FROM %s",
00646 queue->table);
00647
00648 if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00649 return GEARMAN_QUEUE_ERROR;
00650 while (sqlite3_step(sth) == SQLITE_ROW)
00651 {
00652 const void *unique, *function_name;
00653 void *data;
00654 size_t unique_size, function_name_size, data_size;
00655 gearman_job_priority_t priority;
00656
00657 if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
00658 {
00659 unique= sqlite3_column_text(sth,0);
00660 unique_size= (size_t) sqlite3_column_bytes(sth,0);
00661 }
00662 else
00663 {
00664 sqlite3_finalize(sth);
00665 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_replay",
00666 "column %d is not type TEXT", 0);
00667 return GEARMAN_QUEUE_ERROR;
00668 }
00669
00670 if (sqlite3_column_type(sth,1) == SQLITE_TEXT)
00671 {
00672 function_name= sqlite3_column_text(sth,1);
00673 function_name_size= (size_t)sqlite3_column_bytes(sth,1);
00674 }
00675 else
00676 {
00677 sqlite3_finalize(sth);
00678 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_replay",
00679 "column %d is not type TEXT", 1);
00680 return GEARMAN_QUEUE_ERROR;
00681 }
00682
00683 if (sqlite3_column_type(sth,2) == SQLITE_INTEGER)
00684 {
00685 priority= (double)sqlite3_column_int64(sth,2);
00686 }
00687 else
00688 {
00689 sqlite3_finalize(sth);
00690 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_replay",
00691 "column %d is not type INTEGER", 2);
00692 return GEARMAN_QUEUE_ERROR;
00693 }
00694
00695 if (sqlite3_column_type(sth,3) == SQLITE_BLOB)
00696 {
00697 data_size= (size_t)sqlite3_column_bytes(sth,3);
00698
00699 data= malloc(data_size);
00700 if (data == NULL)
00701 {
00702 sqlite3_finalize(sth);
00703 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_replay", "malloc");
00704 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00705 }
00706 memcpy(data, sqlite3_column_blob(sth,3), data_size);
00707 }
00708 else
00709 {
00710 sqlite3_finalize(sth);
00711 GEARMAN_SERVER_ERROR_SET(server, "_sqlite_replay",
00712 "column %d is not type TEXT", 3);
00713 return GEARMAN_QUEUE_ERROR;
00714 }
00715
00716 GEARMAN_SERVER_DEBUG(server, "sqlite replay: %s", (char*)function_name);
00717
00718 gret= (*add_fn)(server, add_context,
00719 unique, unique_size,
00720 function_name, function_name_size,
00721 data, data_size,
00722 priority);
00723 if (gret != GEARMAN_SUCCESS)
00724 {
00725 sqlite3_finalize(sth);
00726 return gret;
00727 }
00728 }
00729
00730 sqlite3_finalize(sth);
00731
00732 return GEARMAN_SUCCESS;
00733 }