00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static uint32_t _server_job_hash(const char *key, size_t key_size);
00030
00035 static gearman_server_job_st *
00036 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00037 gearman_server_function_st *server_function,
00038 const char *unique, size_t data_size);
00039
00042
00043
00044
00045
00046 gearman_server_job_st *
00047 gearman_server_job_add(gearman_server_st *server, const char *function_name,
00048 size_t function_name_size, const char *unique,
00049 size_t unique_size, const void *data, size_t data_size,
00050 gearman_job_priority_t priority,
00051 gearman_server_client_st *server_client,
00052 gearman_return_t *ret_ptr)
00053 {
00054 gearman_server_job_st *server_job;
00055 gearman_server_function_st *server_function;
00056 uint32_t key;
00057
00058 server_function= gearman_server_function_get(server, function_name,
00059 function_name_size);
00060 if (server_function == NULL)
00061 {
00062 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00063 return NULL;
00064 }
00065
00066 if (unique_size == 0)
00067 {
00068 server_job= NULL;
00069 key= 0;
00070 }
00071 else
00072 {
00073 if (unique_size == 1 && *unique == '-')
00074 {
00075 if (data_size == 0)
00076 {
00077 key= 0;
00078 server_job= NULL;
00079 }
00080 else
00081 {
00082
00083 key= _server_job_hash(data, data_size);
00084 server_job= _server_job_get_unique(server, key, server_function, data,
00085 data_size);
00086 }
00087 }
00088 else
00089 {
00090
00091 key= _server_job_hash(unique, unique_size);
00092 server_job= _server_job_get_unique(server, key, server_function, unique,
00093 0);
00094 }
00095 }
00096
00097 if (server_job == NULL)
00098 {
00099 if (server_function->max_queue_size > 0 &&
00100 server_function->job_total >= server_function->max_queue_size)
00101 {
00102 *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
00103 return NULL;
00104 }
00105
00106 server_job= gearman_server_job_create(server, NULL);
00107 if (server_job == NULL)
00108 {
00109 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00110 return NULL;
00111 }
00112
00113 server_job->priority= priority;
00114
00115 server_job->function= server_function;
00116 server_function->job_total++;
00117
00118 snprintf(server_job->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s:%u",
00119 server->job_handle_prefix, server->job_handle_count);
00120 snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
00121 (uint32_t)unique_size, unique);
00122 server->job_handle_count++;
00123 server_job->data= data;
00124 server_job->data_size= data_size;
00125
00126 server_job->unique_key= key;
00127 key= key % GEARMAN_JOB_HASH_SIZE;
00128 GEARMAN_HASH_ADD(server->unique, key, server_job, unique_)
00129
00130 key= _server_job_hash(server_job->job_handle,
00131 strlen(server_job->job_handle));
00132 server_job->job_handle_key= key;
00133 key= key % GEARMAN_JOB_HASH_SIZE;
00134 GEARMAN_HASH_ADD(server->job, key, server_job,)
00135
00136 if (server->options & GEARMAN_SERVER_QUEUE_REPLAY)
00137 server_job->options|= GEARMAN_SERVER_JOB_QUEUED;
00138 else if (server_client == NULL && server->queue_add_fn != NULL)
00139 {
00140 *ret_ptr= (*(server->queue_add_fn))(server,
00141 (void *)server->queue_context,
00142 server_job->unique,
00143 unique_size,
00144 function_name,
00145 function_name_size,
00146 data, data_size, priority);
00147 if (*ret_ptr != GEARMAN_SUCCESS)
00148 {
00149 server_job->data= NULL;
00150 gearman_server_job_free(server_job);
00151 return NULL;
00152 }
00153
00154 if (server->queue_flush_fn != NULL)
00155 {
00156 *ret_ptr= (*(server->queue_flush_fn))(server,
00157 (void *)server->queue_context);
00158 if (*ret_ptr != GEARMAN_SUCCESS)
00159 {
00160 server_job->data= NULL;
00161 gearman_server_job_free(server_job);
00162 return NULL;
00163 }
00164 }
00165
00166 server_job->options|= GEARMAN_SERVER_JOB_QUEUED;
00167 }
00168
00169 *ret_ptr= gearman_server_job_queue(server_job);
00170 if (*ret_ptr != GEARMAN_SUCCESS)
00171 {
00172 if (server_client == NULL && server->queue_done_fn != NULL)
00173 {
00174
00175 (void)(*(server->queue_done_fn))(server,
00176 (void *)server->queue_context,
00177 server_job->unique, unique_size,
00178 server_job->function->function_name,
00179 server_job->function->function_name_size);
00180 }
00181
00182 gearman_server_job_free(server_job);
00183 return NULL;
00184 }
00185 }
00186 else
00187 *ret_ptr= GEARMAN_JOB_EXISTS;
00188
00189 if (server_client != NULL)
00190 {
00191 server_client->job= server_job;
00192 GEARMAN_LIST_ADD(server_job->client, server_client, job_)
00193 }
00194
00195 return server_job;
00196 }
00197
00198 gearman_server_job_st *
00199 gearman_server_job_create(gearman_server_st *server,
00200 gearman_server_job_st *server_job)
00201 {
00202 if (server_job == NULL)
00203 {
00204 if (server->free_job_count > 0)
00205 {
00206 server_job= server->free_job_list;
00207 GEARMAN_LIST_DEL(server->free_job, server_job,)
00208 }
00209 else
00210 {
00211 server_job= malloc(sizeof(gearman_server_job_st));
00212 if (server_job == NULL)
00213 return NULL;
00214 }
00215
00216 server_job->options= GEARMAN_SERVER_JOB_ALLOCATED;
00217 }
00218 else
00219 server_job->options= 0;
00220
00221 server_job->retries= 0;
00222 server_job->priority= 0;
00223 server_job->job_handle_key= 0;
00224 server_job->unique_key= 0;
00225 server_job->client_count= 0;
00226 server_job->numerator= 0;
00227 server_job->denominator= 0;
00228 server_job->data_size= 0;
00229 server_job->server= server;
00230 server_job->next= NULL;
00231 server_job->prev= NULL;
00232 server_job->unique_next= NULL;
00233 server_job->unique_prev= NULL;
00234 server_job->worker_next= NULL;
00235 server_job->worker_prev= NULL;
00236 server_job->function= NULL;
00237 server_job->function_next= NULL;
00238 server_job->data= NULL;
00239 server_job->client_list= NULL;
00240 server_job->worker= NULL;
00241 server_job->job_handle[0]= 0;
00242 server_job->unique[0]= 0;
00243
00244 return server_job;
00245 }
00246
00247 void gearman_server_job_free(gearman_server_job_st *server_job)
00248 {
00249 uint32_t key;
00250
00251 if (server_job->worker != NULL)
00252 server_job->function->job_running--;
00253
00254 server_job->function->job_total--;
00255
00256 if (server_job->data != NULL)
00257 free((void *)(server_job->data));
00258
00259 while (server_job->client_list != NULL)
00260 gearman_server_client_free(server_job->client_list);
00261
00262 if (server_job->worker != NULL)
00263 GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
00264
00265 key= server_job->unique_key % GEARMAN_JOB_HASH_SIZE;
00266 GEARMAN_HASH_DEL(server_job->server->unique, key, server_job, unique_)
00267
00268 key= server_job->job_handle_key % GEARMAN_JOB_HASH_SIZE;
00269 GEARMAN_HASH_DEL(server_job->server->job, key, server_job,)
00270
00271 if (server_job->options & GEARMAN_SERVER_JOB_ALLOCATED)
00272 {
00273 if (server_job->server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
00274 GEARMAN_LIST_ADD(server_job->server->free_job, server_job,)
00275 else
00276 free(server_job);
00277 }
00278 }
00279
00280 gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
00281 const char *job_handle)
00282 {
00283 gearman_server_job_st *server_job;
00284 uint32_t key;
00285
00286 key= _server_job_hash(job_handle, strlen(job_handle));
00287
00288 for (server_job= server->job_hash[key % GEARMAN_JOB_HASH_SIZE];
00289 server_job != NULL; server_job= server_job->next)
00290 {
00291 if (server_job->job_handle_key == key &&
00292 !strcmp(server_job->job_handle, job_handle))
00293 {
00294 return server_job;
00295 }
00296 }
00297
00298 return NULL;
00299 }
00300
00301 gearman_server_job_st *
00302 gearman_server_job_peek(gearman_server_con_st *server_con)
00303 {
00304 gearman_server_worker_st *server_worker;
00305 gearman_job_priority_t priority;
00306
00307 for (server_worker= server_con->worker_list; server_worker != NULL;
00308 server_worker= server_worker->con_next)
00309 {
00310 if (server_worker->function->job_count != 0)
00311 {
00312 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00313 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00314 {
00315 if (server_worker->function->job_list[priority] != NULL)
00316 {
00317 if (server_worker->function->job_list[priority]->options &
00318 GEARMAN_SERVER_JOB_IGNORE)
00319 {
00320
00321
00322 server_worker->function->job_list[priority]->options&=
00323 (gearman_server_job_options_t)~GEARMAN_SERVER_JOB_IGNORE;
00324 gearman_server_job_free(gearman_server_job_take(server_con));
00325 return gearman_server_job_peek(server_con);
00326 }
00327 return server_worker->function->job_list[priority];
00328 }
00329 }
00330 }
00331 }
00332
00333 return NULL;
00334 }
00335
00336 gearman_server_job_st *
00337 gearman_server_job_take(gearman_server_con_st *server_con)
00338 {
00339 gearman_server_worker_st *server_worker;
00340 gearman_server_job_st *server_job;
00341 gearman_job_priority_t priority;
00342
00343 for (server_worker= server_con->worker_list; server_worker != NULL;
00344 server_worker= server_worker->con_next)
00345 {
00346 if (server_worker->function->job_count != 0)
00347 break;
00348 }
00349
00350 if (server_worker == NULL)
00351 return NULL;
00352
00353 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00354 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00355 {
00356 if (server_worker->function->job_list[priority] != NULL)
00357 break;
00358 }
00359
00360 server_job= server_worker->function->job_list[priority];
00361 server_job->function->job_list[priority]= server_job->function_next;
00362 if (server_job->function->job_end[priority] == server_job)
00363 server_job->function->job_end[priority]= NULL;
00364 server_job->function->job_count--;
00365
00366 server_job->worker= server_worker;
00367 GEARMAN_LIST_ADD(server_worker->job, server_job, worker_)
00368 server_job->function->job_running++;
00369
00370 if (server_job->options & GEARMAN_SERVER_JOB_IGNORE)
00371 {
00372 gearman_server_job_free(server_job);
00373 return gearman_server_job_take(server_con);
00374 }
00375
00376 return server_job;
00377 }
00378
00379 gearman_return_t gearman_server_job_queue(gearman_server_job_st *job)
00380 {
00381 gearman_server_client_st *client;
00382 gearman_server_worker_st *worker;
00383 gearman_return_t ret;
00384
00385 if (job->worker != NULL)
00386 {
00387 job->retries++;
00388 if (job->server->job_retries == job->retries)
00389 {
00390 GEARMAN_SERVER_ERROR(job->server,
00391 "Dropped job due to max retry count: %s %s",
00392 job->job_handle, job->unique);
00393 for (client= job->client_list; client != NULL; client= client->job_next)
00394 {
00395 ret= gearman_server_io_packet_add(client->con, false,
00396 GEARMAN_MAGIC_RESPONSE,
00397 GEARMAN_COMMAND_WORK_FAIL,
00398 job->job_handle,
00399 (size_t)strlen(job->job_handle),
00400 NULL);
00401 if (ret != GEARMAN_SUCCESS)
00402 return ret;
00403 }
00404
00405 gearman_server_job_free(job);
00406 return GEARMAN_SUCCESS;
00407 }
00408
00409 GEARMAN_LIST_DEL(job->worker->job, job, worker_)
00410 job->worker= NULL;
00411 job->function->job_running--;
00412 job->function_next= NULL;
00413 job->numerator= 0;
00414 job->denominator= 0;
00415 }
00416
00417
00418 for (worker= job->function->worker_list; worker != NULL;
00419 worker= worker->function_next)
00420 {
00421 if (!(worker->con->options & GEARMAN_SERVER_CON_SLEEPING) ||
00422 worker->con->options & GEARMAN_SERVER_CON_NOOP_SENT)
00423 {
00424 continue;
00425 }
00426
00427 ret= gearman_server_io_packet_add(worker->con, false,
00428 GEARMAN_MAGIC_RESPONSE,
00429 GEARMAN_COMMAND_NOOP, NULL);
00430 if (ret != GEARMAN_SUCCESS)
00431 return ret;
00432
00433 worker->con->options|= GEARMAN_SERVER_CON_NOOP_SENT;
00434 }
00435
00436
00437 if (job->function->job_list[job->priority] == NULL)
00438 job->function->job_list[job->priority]= job;
00439 else
00440 job->function->job_end[job->priority]->function_next= job;
00441 job->function->job_end[job->priority]= job;
00442 job->function->job_count++;
00443
00444 return GEARMAN_SUCCESS;
00445 }
00446
00447
00448
00449
00450
00451 static uint32_t _server_job_hash(const char *key, size_t key_size)
00452 {
00453 const char *ptr= key;
00454 int32_t value= 0;
00455
00456 while (key_size--)
00457 {
00458 value += (int32_t)*ptr++;
00459 value += (value << 10);
00460 value ^= (value >> 6);
00461 }
00462 value += (value << 3);
00463 value ^= (value >> 11);
00464 value += (value << 15);
00465
00466 return (uint32_t)(value == 0 ? 1 : value);
00467 }
00468
00469 static gearman_server_job_st *
00470 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00471 gearman_server_function_st *server_function,
00472 const char *unique, size_t data_size)
00473 {
00474 gearman_server_job_st *server_job;
00475
00476 for (server_job= server->unique_hash[unique_key % GEARMAN_JOB_HASH_SIZE];
00477 server_job != NULL; server_job= server_job->unique_next)
00478 {
00479 if (data_size == 0)
00480 {
00481 if (server_job->function == server_function &&
00482 server_job->unique_key == unique_key &&
00483 !strcmp(server_job->unique, unique))
00484 {
00485 return server_job;
00486 }
00487 }
00488 else
00489 {
00490 if (server_job->function == server_function &&
00491 server_job->unique_key == unique_key &&
00492 server_job->data_size == data_size &&
00493 !memcmp(server_job->data, unique, data_size))
00494 {
00495 return server_job;
00496 }
00497 }
00498 }
00499
00500 return NULL;
00501 }