Gearman Developer Documentation

libgearman-server/queue_libsqlite3.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2009 Cory Bennett
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 #include <libgearman-server/queue_libsqlite3.h>
00017 #include <sqlite3.h>
00018 
00028 #define GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE "gearman_queue"
00029 #define GEARMAN_QUEUE_QUERY_BUFFER 256
00030 #define SQLITE_MAX_TABLE_SIZE 256
00031 #define SQLITE_MAX_CREATE_TABLE_SIZE 1024
00032 
00036 typedef struct
00037 {
00038   sqlite3* db;
00039   char table[SQLITE_MAX_TABLE_SIZE];
00040   char *query;
00041   size_t query_size;
00042   int in_trans;
00043 } gearman_queue_sqlite_st;
00044 
00048 static int _sqlite_query(gearman_server_st *server,
00049                          gearman_queue_sqlite_st *queue,
00050                          const char *query, size_t query_size,
00051                          sqlite3_stmt ** sth);
00052 static int _sqlite_lock(gearman_server_st *server,
00053                         gearman_queue_sqlite_st *queue);
00054 static int _sqlite_commit(gearman_server_st *server,
00055                           gearman_queue_sqlite_st *queue);
00056 static int _sqlite_rollback(gearman_server_st *server,
00057                             gearman_queue_sqlite_st *queue);
00058 
00059 /* Queue callback functions. */
00060 static gearman_return_t _sqlite_add(gearman_server_st *server, void *context,
00061                                     const void *unique, size_t unique_size,
00062                                     const void *function_name,
00063                                     size_t function_name_size,
00064                                     const void *data, size_t data_size,
00065                                     gearman_job_priority_t priority);
00066 static gearman_return_t _sqlite_flush(gearman_server_st *server, void *context);
00067 static gearman_return_t _sqlite_done(gearman_server_st *server, void *context,
00068                                      const void *unique,
00069                                      size_t unique_size,
00070                                      const void *function_name,
00071                                      size_t function_name_size);
00072 static gearman_return_t _sqlite_replay(gearman_server_st *server, void *context,
00073                                        gearman_queue_add_fn *add_fn,
00074                                        void *add_context);
00075 
00078 /*
00079  * Public definitions
00080  */
00081 
00082 gearman_return_t gearman_server_queue_libsqlite3_conf(gearman_conf_st *conf)
00083 {
00084   gearman_conf_module_st *module;
00085 
00086   module= gearman_conf_module_create(conf, NULL, "libsqlite3");
00087   if (module == NULL)
00088     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00089 
00090 #define MCO(__name, __value, __help) \
00091   gearman_conf_module_add_option(module, __name, 0, __value, __help);
00092 
00093   MCO("db", "DB", "Database file to use.")
00094   MCO("table", "TABLE", "Table to use.")
00095 
00096   return gearman_conf_return(conf);
00097 }
00098 
00099 gearman_return_t gearman_server_queue_libsqlite3_init(gearman_server_st *server,
00100                                                       gearman_conf_st *conf)
00101 {
00102   gearman_queue_sqlite_st *queue;
00103   gearman_conf_module_st *module;
00104   const char *name;
00105   const char *value;
00106   char *table= NULL;
00107   const char *query;
00108   sqlite3_stmt* sth;
00109   char create[SQLITE_MAX_CREATE_TABLE_SIZE];
00110 
00111   gearman_log_info(server->gearman, "Initializing libsqlite3 module");
00112 
00113   queue= calloc(1, sizeof(gearman_queue_sqlite_st));
00114   if (queue == NULL)
00115   {
00116     gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init", "malloc");
00117     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00118   }
00119 
00120   snprintf(queue->table, SQLITE_MAX_TABLE_SIZE,
00121            GEARMAN_QUEUE_SQLITE_DEFAULT_TABLE);
00122 
00123   /* Get module and parse the option values that were given. */
00124   module= gearman_conf_module_find(conf, "libsqlite3");
00125   if (module == NULL)
00126   {
00127     gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00128                              "gearman_conf_module_find:NULL");
00129     free(queue);
00130 
00131     return GEARMAN_QUEUE_ERROR;
00132   }
00133 
00134   gearman_server_set_queue_context(server, queue);
00135 
00136   while (gearman_conf_module_value(module, &name, &value))
00137   {
00138     if (! strcmp(name, "db"))
00139     {
00140       if (sqlite3_open(value, &(queue->db)) != SQLITE_OK)
00141       {
00142         gearman_server_queue_libsqlite3_deinit(server);
00143         gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00144                                  "Can't open database: %s\n",
00145                                  sqlite3_errmsg(queue->db));
00146         free(queue);
00147         return GEARMAN_QUEUE_ERROR;
00148       }
00149     }
00150     else if (! strcmp(name, "table"))
00151     {
00152       snprintf(queue->table, SQLITE_MAX_TABLE_SIZE, "%s", value);
00153     }
00154     else
00155     {
00156       gearman_server_queue_libsqlite3_deinit(server);
00157       gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00158                                "Unknown argument: %s", name);
00159       return GEARMAN_QUEUE_ERROR;
00160     }
00161   }
00162 
00163   if (! queue->db)
00164   {
00165     gearman_server_queue_libsqlite3_deinit(server);
00166     gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00167                              "missing required --libsqlite3-db=<dbfile> argument");
00168     return GEARMAN_QUEUE_ERROR;
00169   }
00170 
00171   query= "SELECT name FROM sqlite_master WHERE type='table'";
00172   if (_sqlite_query(server, queue, query, strlen(query), &sth) != SQLITE_OK)
00173   {
00174     gearman_server_queue_libsqlite3_deinit(server);
00175     return GEARMAN_QUEUE_ERROR;
00176   }
00177 
00178   while (sqlite3_step(sth) == SQLITE_ROW)
00179   {
00180     if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
00181       table= (char*)sqlite3_column_text(sth, 0);
00182     else
00183     {
00184       sqlite3_finalize(sth);
00185       gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00186                                "column %d is not type TEXT", 0);
00187       return GEARMAN_QUEUE_ERROR;
00188     }
00189 
00190     if (! strcasecmp(queue->table, table))
00191     {
00192       gearman_log_info(server->gearman, "sqlite module using table '%s'", table);
00193       break;
00194     }
00195   }
00196 
00197   if (sqlite3_finalize(sth) != SQLITE_OK)
00198   {
00199     gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00200                              "sqlite_finalize:%s", sqlite3_errmsg(queue->db));
00201     gearman_server_queue_libsqlite3_deinit(server);
00202     return GEARMAN_QUEUE_ERROR;
00203   }
00204 
00205   if (table == NULL)
00206   {
00207     snprintf(create, SQLITE_MAX_CREATE_TABLE_SIZE,
00208              "CREATE TABLE %s"
00209              "("
00210              "unique_key TEXT PRIMARY KEY,"
00211              "function_name TEXT,"
00212              "priority INTEGER,"
00213              "data BLOB"
00214              ")",
00215              queue->table);
00216 
00217     gearman_log_info(server->gearman, "sqlite module creating table '%s'", queue->table);
00218 
00219     if (_sqlite_query(server, queue, create, strlen(create), &sth)
00220         != SQLITE_OK)
00221     {
00222       gearman_server_queue_libsqlite3_deinit(server);
00223       return GEARMAN_QUEUE_ERROR;
00224     }
00225 
00226     if (sqlite3_step(sth) != SQLITE_DONE)
00227     {
00228       gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00229                                "create table error: %s",
00230                                sqlite3_errmsg(queue->db));
00231       sqlite3_finalize(sth);
00232       return GEARMAN_QUEUE_ERROR;
00233     }
00234 
00235     if (sqlite3_finalize(sth) != SQLITE_OK)
00236     {
00237       gearman_log_error(server->gearman, "gearman_queue_libsqlite3_init",
00238                                "sqlite_finalize:%s",
00239                                sqlite3_errmsg(queue->db));
00240       gearman_server_queue_libsqlite3_deinit(server);
00241       return GEARMAN_QUEUE_ERROR;
00242     }
00243   }
00244 
00245   gearman_server_set_queue_add_fn(server, _sqlite_add);
00246   gearman_server_set_queue_flush_fn(server, _sqlite_flush);
00247   gearman_server_set_queue_done_fn(server, _sqlite_done);
00248   gearman_server_set_queue_replay_fn(server, _sqlite_replay);
00249 
00250   return GEARMAN_SUCCESS;
00251 }
00252 
00253 gearman_return_t
00254 gearman_server_queue_libsqlite3_deinit(gearman_server_st *server)
00255 {
00256   gearman_queue_sqlite_st *queue;
00257 
00258   gearman_log_info(server->gearman, "Shutting down sqlite queue module");
00259 
00260   queue= (gearman_queue_sqlite_st *)gearman_server_queue_context(server);
00261   gearman_server_set_queue_context(server, NULL);
00262   sqlite3_close(queue->db);
00263   if (queue->query != NULL)
00264     free(queue->query);
00265   free(queue);
00266 
00267   return GEARMAN_SUCCESS;
00268 }
00269 
00270 gearman_return_t gearmand_queue_libsqlite3_init(gearmand_st *gearmand,
00271                                                 gearman_conf_st *conf)
00272 {
00273   return gearman_server_queue_libsqlite3_init(&(gearmand->server), conf);
00274 }
00275 
00276 gearman_return_t gearmand_queue_libsqlite3_deinit(gearmand_st *gearmand)
00277 {
00278   return gearman_server_queue_libsqlite3_deinit(&(gearmand->server));
00279 }
00280 
00281 /*
00282  * Static definitions
00283  */
00284 
00285 int _sqlite_query(gearman_server_st *server,
00286                   gearman_queue_sqlite_st *queue,
00287                   const char *query, size_t query_size,
00288                   sqlite3_stmt ** sth)
00289 {
00290   int ret;
00291 
00292   if (query_size > UINT32_MAX)
00293   {
00294     gearman_log_error(server->gearman, "_sqlite_query", "query size too big [%u]",
00295                              (uint32_t)query_size);
00296     return SQLITE_ERROR;
00297   }
00298 
00299   gearman_log_crazy(server->gearman, "sqlite query: %s", query);
00300   ret= sqlite3_prepare(queue->db, query, (int)query_size, sth, NULL);
00301   if (ret  != SQLITE_OK)
00302   {
00303     if (*sth)
00304     {
00305       sqlite3_finalize(*sth);
00306     }
00307     *sth= NULL;
00308     gearman_log_error(server->gearman, "_sqlite_query", "sqlite_prepare:%s",
00309                              sqlite3_errmsg(queue->db));
00310   }
00311 
00312   return ret;
00313 }
00314 
00315 int _sqlite_lock(gearman_server_st *server,
00316                  gearman_queue_sqlite_st *queue)
00317 {
00318   sqlite3_stmt* sth;
00319   int ret;
00320   if (queue->in_trans)
00321   {
00322     /* already in transaction */
00323     return SQLITE_OK;
00324   }
00325 
00326   ret= _sqlite_query(server, queue, "BEGIN TRANSACTION",
00327                      sizeof("BEGIN TRANSACTION") - 1, &sth);
00328   if (ret != SQLITE_OK)
00329   {
00330     gearman_log_error(server->gearman, "_sqlite_lock",
00331                              "failed to begin transaction: %s",
00332                              sqlite3_errmsg(queue->db));
00333     if (sth)
00334     {
00335       sqlite3_finalize(sth);
00336     }
00337 
00338     return ret;
00339   }
00340 
00341   ret= sqlite3_step(sth);
00342   if (ret != SQLITE_DONE)
00343   {
00344     gearman_log_error(server->gearman, "_sqlite_lock", "lock error: %s",
00345                              sqlite3_errmsg(queue->db));
00346     sqlite3_finalize(sth);
00347     return ret;
00348   }
00349 
00350   sqlite3_finalize(sth);
00351   queue->in_trans++;
00352 
00353   return SQLITE_OK;
00354 }
00355 
00356 int _sqlite_commit(gearman_server_st *server,
00357                    gearman_queue_sqlite_st *queue)
00358 {
00359   sqlite3_stmt* sth;
00360   int ret;
00361 
00362   if (! queue->in_trans)
00363   {
00364     /* not in transaction */
00365     return SQLITE_OK;
00366   }
00367 
00368   ret= _sqlite_query(server, queue, "COMMIT", sizeof("COMMIT") - 1, &sth);
00369   if (ret != SQLITE_OK)
00370   {
00371     gearman_log_error(server->gearman, "_sqlite_commit",
00372                              "failed to commit transaction: %s",
00373                              sqlite3_errmsg(queue->db));
00374     if (sth)
00375     {
00376       sqlite3_finalize(sth);
00377     }
00378 
00379     return ret;
00380   }
00381   ret= sqlite3_step(sth);
00382   if (ret != SQLITE_DONE)
00383   {
00384     gearman_log_error(server->gearman, "_sqlite_commit", "commit error: %s",
00385                              sqlite3_errmsg(queue->db));
00386     sqlite3_finalize(sth);
00387     return ret;
00388   }
00389   sqlite3_finalize(sth);
00390   queue->in_trans= 0;
00391   return SQLITE_OK;
00392 }
00393 
00394 int _sqlite_rollback(gearman_server_st *server,
00395                      gearman_queue_sqlite_st *queue)
00396 {
00397   sqlite3_stmt* sth;
00398   int ret;
00399   const char* query;
00400 
00401   if (! queue->in_trans)
00402   {
00403     /* not in transaction */
00404     return SQLITE_OK;
00405   }
00406 
00407   query= "ROLLBACK";
00408   ret= _sqlite_query(server, queue, query, strlen(query), &sth);
00409   if (ret != SQLITE_OK)
00410   {
00411     gearman_log_error(server->gearman, "_sqlite_rollback",
00412                              "failed to rollback transaction: %s",
00413                              sqlite3_errmsg(queue->db));
00414     if (sth)
00415     {
00416       sqlite3_finalize(sth);
00417     }
00418 
00419     return ret;
00420   }
00421   ret= sqlite3_step(sth);
00422   if (ret != SQLITE_DONE)
00423   {
00424     gearman_log_error(server->gearman, "_sqlite_rollback", "rollback error: %s",
00425                              sqlite3_errmsg(queue->db));
00426     sqlite3_finalize(sth);
00427     return ret;
00428   }
00429   sqlite3_finalize(sth);
00430   queue->in_trans= 0;
00431 
00432   return SQLITE_OK;
00433 }
00434 
00435 static gearman_return_t _sqlite_add(gearman_server_st *server, void *context,
00436                                     const void *unique, size_t unique_size,
00437                                     const void *function_name,
00438                                     size_t function_name_size,
00439                                     const void *data, size_t data_size,
00440                                     gearman_job_priority_t priority)
00441 {
00442   gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00443   char *query;
00444   size_t query_size;
00445   sqlite3_stmt* sth;
00446 
00447   if (unique_size > UINT32_MAX || function_name_size > UINT32_MAX ||
00448       data_size > UINT32_MAX)
00449   {
00450     gearman_log_error(server->gearman, "_sqlite_add", "size too big [%u]",
00451                              (uint32_t)unique_size);
00452     return SQLITE_ERROR;
00453   }
00454 
00455   gearman_log_debug(server->gearman, "sqlite add: %.*s", (uint32_t)unique_size, (char *)unique);
00456 
00457   if (_sqlite_lock(server, queue) !=  SQLITE_OK)
00458     return GEARMAN_QUEUE_ERROR;
00459 
00460   query_size= ((unique_size + function_name_size + data_size) * 2) +
00461     GEARMAN_QUEUE_QUERY_BUFFER;
00462   if (query_size > queue->query_size)
00463   {
00464     query= (char *)realloc(queue->query, query_size);
00465     if (query == NULL)
00466     {
00467       gearman_log_error(server->gearman, "_sqlite_add", "realloc");
00468       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00469     }
00470 
00471     queue->query= query;
00472     queue->query_size= query_size;
00473   }
00474   else
00475   {
00476     query= queue->query;
00477   }
00478 
00479   query_size= (size_t)snprintf(query, query_size,
00480                                "INSERT OR REPLACE INTO %s (priority,unique_key,"
00481                                "function_name,data) VALUES (?,?,?,?)",
00482                                queue->table);
00483 
00484   if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00485     return GEARMAN_QUEUE_ERROR;
00486 
00487   if (sqlite3_bind_int(sth,  1, priority) != SQLITE_OK)
00488   {
00489     _sqlite_rollback(server, queue);
00490     gearman_log_error(server->gearman, "_sqlite_add", "failed to bind int [%d]: %s", priority, sqlite3_errmsg(queue->db));
00491     sqlite3_finalize(sth);
00492 
00493     return GEARMAN_QUEUE_ERROR;
00494   }
00495 
00496   if (sqlite3_bind_text(sth, 2, unique, (int)unique_size,
00497                         SQLITE_TRANSIENT) != SQLITE_OK)
00498   {
00499     _sqlite_rollback(server, queue);
00500     gearman_log_error(server->gearman, "_sqlite_add",
00501                              "failed to bind text [%.*s]: %s",
00502                              (uint32_t)unique_size, (char*)unique,
00503                              sqlite3_errmsg(queue->db));
00504     sqlite3_finalize(sth);
00505     return GEARMAN_QUEUE_ERROR;
00506   }
00507 
00508   if (sqlite3_bind_text(sth, 3, function_name, (int)function_name_size,
00509                         SQLITE_TRANSIENT) != SQLITE_OK)
00510   {
00511     _sqlite_rollback(server, queue);
00512     gearman_log_error(server->gearman, "_sqlite_add",
00513                              "failed to bind text [%.*s]: %s",
00514                              (uint32_t)function_name_size, (char*)function_name,
00515                              sqlite3_errmsg(queue->db));
00516     sqlite3_finalize(sth);
00517     return GEARMAN_QUEUE_ERROR;
00518   }
00519 
00520   if (sqlite3_bind_blob(sth, 4, data, (int)data_size,
00521                         SQLITE_TRANSIENT) != SQLITE_OK)
00522   {
00523     _sqlite_rollback(server, queue);
00524     gearman_log_error(server->gearman, "_sqlite_add", "failed to bind blob: %s",
00525                              sqlite3_errmsg(queue->db));
00526     sqlite3_finalize(sth);
00527     return GEARMAN_QUEUE_ERROR;
00528   }
00529 
00530   if (sqlite3_step(sth) != SQLITE_DONE)
00531   {
00532     gearman_log_error(server->gearman, "_sqlite_add", "insert error: %s",
00533                              sqlite3_errmsg(queue->db));
00534     if (sqlite3_finalize(sth) != SQLITE_OK )
00535     {
00536       gearman_log_error(server->gearman, "_sqlite_add", "finalize error: %s",
00537                                sqlite3_errmsg(queue->db));
00538     }
00539 
00540     return GEARMAN_QUEUE_ERROR;
00541   }
00542 
00543   gearman_log_crazy(server->gearman,
00544                     "sqlite data: priority: %d, unique_key: %s, function_name: %s",
00545                     priority, (char*)unique, (char*)function_name);
00546 
00547   sqlite3_finalize(sth);
00548 
00549   if (_sqlite_commit(server, queue) !=  SQLITE_OK)
00550     return GEARMAN_QUEUE_ERROR;
00551 
00552   return GEARMAN_SUCCESS;
00553 }
00554 
00555 static gearman_return_t _sqlite_flush(gearman_server_st *server,
00556                                       void *context __attribute__((unused)))
00557 {
00558   gearman_log_debug(server->gearman, "sqlite flush");
00559 
00560   return GEARMAN_SUCCESS;
00561 }
00562 
00563 static gearman_return_t _sqlite_done(gearman_server_st *server, void *context,
00564                                      const void *unique,
00565                                      size_t unique_size,
00566                                      const void *function_name __attribute__((unused)),
00567                                      size_t function_name_size __attribute__((unused)))
00568 {
00569   gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00570   char *query;
00571   size_t query_size;
00572   sqlite3_stmt* sth;
00573 
00574   if (unique_size > UINT32_MAX)
00575   {
00576     gearman_log_error(server->gearman,
00577                              "_sqlite_query", "unique key size too big [%u]",
00578                              (uint32_t)unique_size);
00579     return SQLITE_ERROR;
00580   }
00581 
00582   gearman_log_debug(server->gearman, "sqlite done: %.*s", (uint32_t)unique_size, (char *)unique);
00583 
00584     if (_sqlite_lock(server, queue) !=  SQLITE_OK)
00585       return GEARMAN_QUEUE_ERROR;
00586 
00587   query_size= (unique_size * 2) + GEARMAN_QUEUE_QUERY_BUFFER;
00588   if (query_size > queue->query_size)
00589   {
00590     query= (char *)realloc(queue->query, query_size);
00591     if (query == NULL)
00592     {
00593       gearman_log_error(server->gearman, "_sqlite_add", "realloc");
00594       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00595     }
00596 
00597     queue->query= query;
00598     queue->query_size= query_size;
00599   }
00600   else
00601     query= queue->query;
00602 
00603   query_size= (size_t)snprintf(query, query_size,
00604                                "DELETE FROM %s WHERE unique_key=?",
00605                                queue->table);
00606 
00607   if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00608     return GEARMAN_QUEUE_ERROR;
00609 
00610   sqlite3_bind_text(sth, 1, unique, (int)unique_size, SQLITE_TRANSIENT);
00611 
00612   if (sqlite3_step(sth) != SQLITE_DONE)
00613   {
00614     gearman_log_error(server->gearman, "_sqlite_done", "delete error: %s",
00615                              sqlite3_errmsg(queue->db));
00616     sqlite3_finalize(sth);
00617     return GEARMAN_QUEUE_ERROR;
00618   }
00619 
00620   sqlite3_finalize(sth);
00621 
00622   if (_sqlite_commit(server, queue) !=  SQLITE_OK)
00623     return GEARMAN_QUEUE_ERROR;
00624 
00625   return GEARMAN_SUCCESS;
00626 }
00627 
00628 static gearman_return_t _sqlite_replay(gearman_server_st *server, void *context,
00629                                        gearman_queue_add_fn *add_fn,
00630                                        void *add_context)
00631 {
00632   gearman_queue_sqlite_st *queue= (gearman_queue_sqlite_st *)context;
00633   char *query;
00634   size_t query_size;
00635   sqlite3_stmt* sth;
00636   gearman_return_t gret;
00637 
00638   gearman_log_info(server->gearman, "sqlite replay start");
00639 
00640   if (GEARMAN_QUEUE_QUERY_BUFFER > queue->query_size)
00641   {
00642     query= (char *)realloc(queue->query, GEARMAN_QUEUE_QUERY_BUFFER);
00643     if (query == NULL)
00644     {
00645       gearman_log_error(server->gearman, "_sqlite_replay", "realloc");
00646       return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00647     }
00648 
00649     queue->query= query;
00650     queue->query_size= GEARMAN_QUEUE_QUERY_BUFFER;
00651   }
00652   else
00653     query= queue->query;
00654 
00655   query_size= (size_t)snprintf(query, GEARMAN_QUEUE_QUERY_BUFFER,
00656                                "SELECT unique_key,function_name,priority,data "
00657                                "FROM %s",
00658                                queue->table);
00659 
00660   if (_sqlite_query(server, queue, query, query_size, &sth) != SQLITE_OK)
00661     return GEARMAN_QUEUE_ERROR;
00662   while (sqlite3_step(sth) == SQLITE_ROW)
00663   {
00664     const void *unique, *function_name;
00665     void *data;
00666     size_t unique_size, function_name_size, data_size;
00667     gearman_job_priority_t priority;
00668 
00669     if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
00670     {
00671       unique= sqlite3_column_text(sth,0);
00672       unique_size= (size_t) sqlite3_column_bytes(sth,0);
00673     }
00674     else
00675     {
00676       sqlite3_finalize(sth);
00677       gearman_log_error(server->gearman, "_sqlite_replay",
00678                                "column %d is not type TEXT", 0);
00679       return GEARMAN_QUEUE_ERROR;
00680     }
00681 
00682     if (sqlite3_column_type(sth,1) == SQLITE_TEXT)
00683     {
00684       function_name= sqlite3_column_text(sth,1);
00685       function_name_size= (size_t)sqlite3_column_bytes(sth,1);
00686     }
00687     else
00688     {
00689       sqlite3_finalize(sth);
00690       gearman_log_error(server->gearman, "_sqlite_replay",
00691                                "column %d is not type TEXT", 1);
00692       return GEARMAN_QUEUE_ERROR;
00693     }
00694 
00695     if (sqlite3_column_type(sth,2) == SQLITE_INTEGER)
00696     {
00697       priority= (double)sqlite3_column_int64(sth,2);
00698     }
00699     else
00700     {
00701       sqlite3_finalize(sth);
00702       gearman_log_error(server->gearman, "_sqlite_replay",
00703                                "column %d is not type INTEGER", 2);
00704       return GEARMAN_QUEUE_ERROR;
00705     }
00706 
00707     if (sqlite3_column_type(sth,3) == SQLITE_BLOB)
00708     {
00709       data_size= (size_t)sqlite3_column_bytes(sth,3);
00710       /* need to make a copy here ... gearman_server_job_free will free it later */
00711       data= (void *)malloc(data_size);
00712       if (data == NULL)
00713       {
00714         sqlite3_finalize(sth);
00715         gearman_log_error(server->gearman, "_sqlite_replay", "malloc");
00716         return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00717       }
00718       memcpy(data, sqlite3_column_blob(sth,3), data_size);
00719     }
00720     else
00721     {
00722       sqlite3_finalize(sth);
00723       gearman_log_error(server->gearman, "_sqlite_replay", "column %d is not type TEXT", 3);
00724       return GEARMAN_QUEUE_ERROR;
00725     }
00726 
00727     gearman_log_debug(server->gearman, "sqlite replay: %s", (char*)function_name);
00728 
00729     gret= (*add_fn)(server, add_context,
00730                     unique, unique_size,
00731                     function_name, function_name_size,
00732                     data, data_size,
00733                     priority);
00734     if (gret != GEARMAN_SUCCESS)
00735     {
00736       sqlite3_finalize(sth);
00737       return gret;
00738     }
00739   }
00740 
00741   sqlite3_finalize(sth);
00742 
00743   return GEARMAN_SUCCESS;
00744 }