00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static uint32_t _server_job_hash(const char *key, size_t key_size);
00030
00034 static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
00035 gearman_server_worker_st *worker);
00036
00041 static gearman_server_job_st *
00042 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00043 gearman_server_function_st *server_function,
00044 const char *unique, size_t data_size);
00045
00048
00049
00050
00051
00052 gearman_server_job_st *
00053 gearman_server_job_add(gearman_server_st *server, const char *function_name,
00054 size_t function_name_size, const char *unique,
00055 size_t unique_size, const void *data, size_t data_size,
00056 gearman_job_priority_t priority,
00057 gearman_server_client_st *server_client,
00058 gearman_return_t *ret_ptr)
00059 {
00060 gearman_server_job_st *server_job;
00061 gearman_server_function_st *server_function;
00062 uint32_t key;
00063
00064 server_function= gearman_server_function_get(server, function_name,
00065 function_name_size);
00066 if (server_function == NULL)
00067 {
00068 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00069 return NULL;
00070 }
00071
00072 if (unique_size == 0)
00073 {
00074 server_job= NULL;
00075 key= 0;
00076 }
00077 else
00078 {
00079 if (unique_size == 1 && *unique == '-')
00080 {
00081 if (data_size == 0)
00082 {
00083 key= 0;
00084 server_job= NULL;
00085 }
00086 else
00087 {
00088
00089 key= _server_job_hash(data, data_size);
00090 server_job= _server_job_get_unique(server, key, server_function, data,
00091 data_size);
00092 }
00093 }
00094 else
00095 {
00096
00097 key= _server_job_hash(unique, unique_size);
00098 server_job= _server_job_get_unique(server, key, server_function, unique,
00099 0);
00100 }
00101 }
00102
00103 if (server_job == NULL)
00104 {
00105 if (server_function->max_queue_size > 0 &&
00106 server_function->job_total >= server_function->max_queue_size)
00107 {
00108 *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
00109 return NULL;
00110 }
00111
00112 server_job= gearman_server_job_create(server, NULL);
00113 if (server_job == NULL)
00114 {
00115 *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
00116 return NULL;
00117 }
00118
00119 server_job->priority= priority;
00120
00121 server_job->function= server_function;
00122 server_function->job_total++;
00123
00124 snprintf(server_job->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%s:%u",
00125 server->job_handle_prefix, server->job_handle_count);
00126 snprintf(server_job->unique, GEARMAN_UNIQUE_SIZE, "%.*s",
00127 (uint32_t)unique_size, unique);
00128 server->job_handle_count++;
00129 server_job->data= data;
00130 server_job->data_size= data_size;
00131
00132 server_job->unique_key= key;
00133 key= key % GEARMAN_JOB_HASH_SIZE;
00134 GEARMAN_HASH_ADD(server->unique, key, server_job, unique_);
00135
00136 key= _server_job_hash(server_job->job_handle,
00137 strlen(server_job->job_handle));
00138 server_job->job_handle_key= key;
00139 key= key % GEARMAN_JOB_HASH_SIZE;
00140 GEARMAN_HASH_ADD(server->job, key, server_job,);
00141
00142 if (server->state.queue_startup)
00143 {
00144 server_job->job_queued= true;
00145 }
00146 else if (server_client == NULL && server->queue_add_fn != NULL)
00147 {
00148 *ret_ptr= (*(server->queue_add_fn))(server,
00149 (void *)server->queue_context,
00150 server_job->unique,
00151 unique_size,
00152 function_name,
00153 function_name_size,
00154 data, data_size, priority);
00155 if (*ret_ptr != GEARMAN_SUCCESS)
00156 {
00157 server_job->data= NULL;
00158 gearman_server_job_free(server_job);
00159 return NULL;
00160 }
00161
00162 if (server->queue_flush_fn != NULL)
00163 {
00164 *ret_ptr= (*(server->queue_flush_fn))(server,
00165 (void *)server->queue_context);
00166 if (*ret_ptr != GEARMAN_SUCCESS)
00167 {
00168 server_job->data= NULL;
00169 gearman_server_job_free(server_job);
00170 return NULL;
00171 }
00172 }
00173
00174 server_job->job_queued= true;
00175 }
00176
00177 *ret_ptr= gearman_server_job_queue(server_job);
00178 if (*ret_ptr != GEARMAN_SUCCESS)
00179 {
00180 if (server_client == NULL && server->queue_done_fn != NULL)
00181 {
00182
00183 (void)(*(server->queue_done_fn))(server,
00184 (void *)server->queue_context,
00185 server_job->unique, unique_size,
00186 server_job->function->function_name,
00187 server_job->function->function_name_size);
00188 }
00189
00190 gearman_server_job_free(server_job);
00191 return NULL;
00192 }
00193 }
00194 else
00195 *ret_ptr= GEARMAN_JOB_EXISTS;
00196
00197 if (server_client != NULL)
00198 {
00199 server_client->job= server_job;
00200 GEARMAN_LIST_ADD(server_job->client, server_client, job_)
00201 }
00202
00203 return server_job;
00204 }
00205
00206 gearman_server_job_st *
00207 gearman_server_job_create(gearman_server_st *server,
00208 gearman_server_job_st *server_job)
00209 {
00210 if (server_job == NULL)
00211 {
00212 if (server->free_job_count > 0)
00213 {
00214 server_job= server->free_job_list;
00215 GEARMAN_LIST_DEL(server->free_job, server_job,)
00216 }
00217 else
00218 {
00219 server_job= (gearman_server_job_st *)malloc(sizeof(gearman_server_job_st));
00220 if (server_job == NULL)
00221 return NULL;
00222 }
00223
00224 server_job->options.allocated= true;
00225 }
00226 else
00227 server_job->options.allocated= false;
00228
00229 server_job->ignore_job= false;
00230 server_job->job_queued= false;
00231 server_job->retries= 0;
00232 server_job->priority= 0;
00233 server_job->job_handle_key= 0;
00234 server_job->unique_key= 0;
00235 server_job->client_count= 0;
00236 server_job->numerator= 0;
00237 server_job->denominator= 0;
00238 server_job->data_size= 0;
00239 server_job->server= server;
00240 server_job->next= NULL;
00241 server_job->prev= NULL;
00242 server_job->unique_next= NULL;
00243 server_job->unique_prev= NULL;
00244 server_job->worker_next= NULL;
00245 server_job->worker_prev= NULL;
00246 server_job->function= NULL;
00247 server_job->function_next= NULL;
00248 server_job->data= NULL;
00249 server_job->client_list= NULL;
00250 server_job->worker= NULL;
00251 server_job->job_handle[0]= 0;
00252 server_job->unique[0]= 0;
00253
00254 return server_job;
00255 }
00256
00257 void gearman_server_job_free(gearman_server_job_st *server_job)
00258 {
00259 uint32_t key;
00260
00261 if (server_job->worker != NULL)
00262 server_job->function->job_running--;
00263
00264 server_job->function->job_total--;
00265
00266 if (server_job->data != NULL)
00267 free((void *)(server_job->data));
00268
00269 while (server_job->client_list != NULL)
00270 gearman_server_client_free(server_job->client_list);
00271
00272 if (server_job->worker != NULL)
00273 GEARMAN_LIST_DEL(server_job->worker->job, server_job, worker_)
00274
00275 key= server_job->unique_key % GEARMAN_JOB_HASH_SIZE;
00276 GEARMAN_HASH_DEL(server_job->server->unique, key, server_job, unique_);
00277
00278 key= server_job->job_handle_key % GEARMAN_JOB_HASH_SIZE;
00279 GEARMAN_HASH_DEL(server_job->server->job, key, server_job,);
00280
00281 if (server_job->options.allocated)
00282 {
00283 if (server_job->server->free_job_count < GEARMAN_MAX_FREE_SERVER_JOB)
00284 GEARMAN_LIST_ADD(server_job->server->free_job, server_job,)
00285 else
00286 free(server_job);
00287 }
00288 }
00289
00290 gearman_server_job_st *gearman_server_job_get(gearman_server_st *server,
00291 const char *job_handle,
00292 gearman_server_con_st *worker_con)
00293 {
00294 uint32_t key;
00295
00296 key= _server_job_hash(job_handle, strlen(job_handle));
00297
00298 for (gearman_server_job_st *server_job= server->job_hash[key % GEARMAN_JOB_HASH_SIZE];
00299 server_job != NULL; server_job= server_job->next)
00300 {
00301 if (server_job->job_handle_key == key &&
00302 !strcmp(server_job->job_handle, job_handle))
00303 {
00304
00305 if (worker_con != NULL &&
00306 (server_job->worker == NULL || server_job->worker->con != worker_con))
00307 {
00308 return NULL;
00309 }
00310
00311 return server_job;
00312 }
00313 }
00314
00315 return NULL;
00316 }
00317
00318 gearman_server_job_st *
00319 gearman_server_job_peek(gearman_server_con_st *server_con)
00320 {
00321 gearman_server_worker_st *server_worker;
00322 gearman_job_priority_t priority;
00323
00324 for (server_worker= server_con->worker_list; server_worker != NULL;
00325 server_worker= server_worker->con_next)
00326 {
00327 if (server_worker->function->job_count != 0)
00328 {
00329 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00330 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00331 {
00332 if (server_worker->function->job_list[priority] != NULL)
00333 {
00334 if (server_worker->function->job_list[priority]->ignore_job)
00335 {
00336
00337
00338 server_worker->function->job_list[priority]->ignore_job= false;
00339
00340 gearman_server_job_free(gearman_server_job_take(server_con));
00341
00342 return gearman_server_job_peek(server_con);
00343 }
00344 return server_worker->function->job_list[priority];
00345 }
00346 }
00347 }
00348 }
00349
00350 return NULL;
00351 }
00352
00353 static inline void _server_con_worker_list_append(gearman_server_worker_st *list,
00354 gearman_server_worker_st *worker)
00355 {
00356 worker->con_prev= NULL;
00357 worker->con_next= list;
00358 while (worker->con_next != NULL)
00359 {
00360 worker->con_prev= worker->con_next;
00361 worker->con_next= worker->con_next->con_next;
00362 }
00363 if (worker->con_prev)
00364 worker->con_prev->con_next= worker;
00365 }
00366
00367 gearman_server_job_st *
00368 gearman_server_job_take(gearman_server_con_st *server_con)
00369 {
00370 gearman_server_worker_st *server_worker;
00371 gearman_server_job_st *server_job;
00372 gearman_job_priority_t priority;
00373
00374 for (server_worker= server_con->worker_list; server_worker != NULL;
00375 server_worker= server_worker->con_next)
00376 {
00377 if (server_worker->function->job_count != 0)
00378 break;
00379 }
00380
00381 if (server_worker == NULL)
00382 return NULL;
00383
00384 if (server_con->thread->server->flags.round_robin)
00385 {
00386 GEARMAN_LIST_DEL(server_con->worker, server_worker, con_)
00387 _server_con_worker_list_append(server_con->worker_list, server_worker);
00388 ++server_con->worker_count;
00389 if (server_con->worker_list == NULL) {
00390 server_con->worker_list= server_worker;
00391 }
00392 }
00393
00394 for (priority= GEARMAN_JOB_PRIORITY_HIGH;
00395 priority != GEARMAN_JOB_PRIORITY_MAX; priority++)
00396 {
00397 if (server_worker->function->job_list[priority] != NULL)
00398 break;
00399 }
00400
00401 server_job= server_worker->function->job_list[priority];
00402 server_job->function->job_list[priority]= server_job->function_next;
00403 if (server_job->function->job_end[priority] == server_job)
00404 server_job->function->job_end[priority]= NULL;
00405 server_job->function->job_count--;
00406
00407 server_job->worker= server_worker;
00408 GEARMAN_LIST_ADD(server_worker->job, server_job, worker_)
00409 server_job->function->job_running++;
00410
00411 if (server_job->ignore_job)
00412 {
00413 gearman_server_job_free(server_job);
00414 return gearman_server_job_take(server_con);
00415 }
00416
00417 return server_job;
00418 }
00419
00420 gearman_return_t gearman_server_job_queue(gearman_server_job_st *job)
00421 {
00422 gearman_server_client_st *client;
00423 gearman_server_worker_st *worker;
00424 uint32_t noop_sent;
00425 gearman_return_t ret;
00426
00427 if (job->worker != NULL)
00428 {
00429 job->retries++;
00430 if (job->server->job_retries == job->retries)
00431 {
00432 gearman_log_error(job->server->gearman,
00433 "Dropped job due to max retry count: %s %s",
00434 job->job_handle, job->unique);
00435 for (client= job->client_list; client != NULL; client= client->job_next)
00436 {
00437 ret= gearman_server_io_packet_add(client->con, false,
00438 GEARMAN_MAGIC_RESPONSE,
00439 GEARMAN_COMMAND_WORK_FAIL,
00440 job->job_handle,
00441 (size_t)strlen(job->job_handle),
00442 NULL);
00443 if (ret != GEARMAN_SUCCESS)
00444 return ret;
00445 }
00446
00447
00448 if (job->job_queued && job->server->queue_done_fn != NULL)
00449 {
00450 ret= (*(job->server->queue_done_fn))(job->server,
00451 (void *)job->server->queue_context,
00452 job->unique,
00453 (size_t)strlen(job->unique),
00454 job->function->function_name,
00455 job->function->function_name_size);
00456 if (ret != GEARMAN_SUCCESS)
00457 return ret;
00458 }
00459
00460 gearman_server_job_free(job);
00461 return GEARMAN_SUCCESS;
00462 }
00463
00464 GEARMAN_LIST_DEL(job->worker->job, job, worker_)
00465 job->worker= NULL;
00466 job->function->job_running--;
00467 job->function_next= NULL;
00468 job->numerator= 0;
00469 job->denominator= 0;
00470 }
00471
00472
00473 if (job->function->worker_list != NULL)
00474 {
00475 worker= job->function->worker_list;
00476 noop_sent= 0;
00477 do
00478 {
00479 if (worker->con->is_sleeping && ! (worker->con->is_noop_sent))
00480 {
00481 ret= gearman_server_io_packet_add(worker->con, false,
00482 GEARMAN_MAGIC_RESPONSE,
00483 GEARMAN_COMMAND_NOOP, NULL);
00484 if (ret != GEARMAN_SUCCESS)
00485 return ret;
00486
00487 worker->con->is_noop_sent= true;
00488 noop_sent++;
00489 }
00490
00491 worker= worker->function_next;
00492 }
00493 while (worker != job->function->worker_list &&
00494 (job->server->worker_wakeup == 0 ||
00495 noop_sent < job->server->worker_wakeup));
00496
00497 job->function->worker_list= worker;
00498 }
00499
00500
00501 if (job->function->job_list[job->priority] == NULL)
00502 job->function->job_list[job->priority]= job;
00503 else
00504 job->function->job_end[job->priority]->function_next= job;
00505 job->function->job_end[job->priority]= job;
00506 job->function->job_count++;
00507
00508 return GEARMAN_SUCCESS;
00509 }
00510
00511
00512
00513
00514
00515 static uint32_t _server_job_hash(const char *key, size_t key_size)
00516 {
00517 const char *ptr= key;
00518 int32_t value= 0;
00519
00520 while (key_size--)
00521 {
00522 value += (int32_t)*ptr++;
00523 value += (value << 10);
00524 value ^= (value >> 6);
00525 }
00526 value += (value << 3);
00527 value ^= (value >> 11);
00528 value += (value << 15);
00529
00530 return (uint32_t)(value == 0 ? 1 : value);
00531 }
00532
00533 static gearman_server_job_st *
00534 _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
00535 gearman_server_function_st *server_function,
00536 const char *unique, size_t data_size)
00537 {
00538 gearman_server_job_st *server_job;
00539
00540 for (server_job= server->unique_hash[unique_key % GEARMAN_JOB_HASH_SIZE];
00541 server_job != NULL; server_job= server_job->unique_next)
00542 {
00543 if (data_size == 0)
00544 {
00545 if (server_job->function == server_function &&
00546 server_job->unique_key == unique_key &&
00547 !strcmp(server_job->unique, unique))
00548 {
00549 return server_job;
00550 }
00551 }
00552 else
00553 {
00554 if (server_job->function == server_function &&
00555 server_job->unique_key == unique_key &&
00556 server_job->data_size == data_size &&
00557 !memcmp(server_job->data, unique, data_size))
00558 {
00559 return server_job;
00560 }
00561 }
00562 }
00563
00564 return NULL;
00565 }