00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void *_thread(void *data);
00028 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00029 static void _run(gearman_server_thread_st *thread, void *fn_arg);
00030
00031 static gearman_return_t _wakeup_init(gearmand_thread_st *thread);
00032 static void _wakeup_close(gearmand_thread_st *thread);
00033 static void _wakeup_clear(gearmand_thread_st *thread);
00034 static void _wakeup_event(int fd, short events, void *arg);
00035 static void _clear_events(gearmand_thread_st *thread);
00036
00039
00040
00041
00042
00043 gearman_return_t gearmand_thread_create(gearmand_st *gearmand)
00044 {
00045 gearmand_thread_st *thread;
00046 gearman_return_t ret;
00047 int pthread_ret;
00048
00049 thread= (gearmand_thread_st *)malloc(sizeof(gearmand_thread_st));
00050 if (thread == NULL)
00051 {
00052 gearmand_log_fatal(gearmand, "gearmand_thread_create:malloc");
00053 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00054 }
00055
00056 if (gearman_server_thread_create(&(gearmand->server),
00057 &(thread->server_thread)) == NULL)
00058 {
00059 free(thread);
00060 gearmand_log_fatal(gearmand, "gearmand_thread_create:gearman_server_thread_create:NULL");
00061 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00062 }
00063
00064 gearman_server_thread_set_log_fn(&(thread->server_thread), _log, thread,
00065 gearmand->verbose);
00066 gearman_server_thread_set_event_watch(&(thread->server_thread),
00067 gearmand_connection_watch, NULL);
00068
00069 thread->is_thread_lock= false;
00070 thread->is_wakeup_event= false;
00071 thread->count= 0;
00072 thread->dcon_count= 0;
00073 thread->dcon_add_count= 0;
00074 thread->free_dcon_count= 0;
00075 thread->wakeup_fd[0]= -1;
00076 thread->wakeup_fd[1]= -1;
00077 GEARMAN_LIST_ADD(gearmand->thread, thread,)
00078 thread->gearmand= gearmand;
00079 thread->dcon_list= NULL;
00080 thread->dcon_add_list= NULL;
00081 thread->free_dcon_list= NULL;
00082
00083
00084
00085 if (gearmand->threads == 0)
00086 thread->base= gearmand->base;
00087 else
00088 {
00089 gearmand_log_info(gearmand, "Initializing libevent for IO thread");
00090
00091 thread->base= event_base_new();
00092 if (thread->base == NULL)
00093 {
00094 gearmand_thread_free(thread);
00095 gearmand_log_fatal(gearmand, "gearmand_thread_create:event_base_new:NULL");
00096 return GEARMAN_EVENT;
00097 }
00098 }
00099
00100 ret= _wakeup_init(thread);
00101 if (ret != GEARMAN_SUCCESS)
00102 {
00103 gearmand_thread_free(thread);
00104 return ret;
00105 }
00106
00107
00108 if (gearmand->threads == 0)
00109 return GEARMAN_SUCCESS;
00110
00111 thread->count= gearmand->thread_count;
00112
00113 pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
00114 if (pthread_ret != 0)
00115 {
00116 thread->count= 0;
00117 gearmand_thread_free(thread);
00118 gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_mutex_init:%d", pthread_ret);
00119 return GEARMAN_PTHREAD;
00120 }
00121
00122 thread->is_thread_lock= true;
00123
00124 gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
00125
00126 pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
00127 if (pthread_ret != 0)
00128 {
00129 thread->count= 0;
00130 gearmand_thread_free(thread);
00131 gearmand_log_fatal(gearmand, "gearmand_thread_create:pthread_create:%d", pthread_ret);
00132
00133 return GEARMAN_PTHREAD;
00134 }
00135
00136 gearmand_log_info(gearmand, "Thread %u created", thread->count);
00137
00138 return GEARMAN_SUCCESS;
00139 }
00140
00141 void gearmand_thread_free(gearmand_thread_st *thread)
00142 {
00143 gearmand_con_st *dcon;
00144
00145 if (thread->gearmand->threads && thread->count > 0)
00146 {
00147 gearmand_log_info(thread->gearmand, "Shutting down thread %u", thread->count);
00148
00149 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
00150 (void) pthread_join(thread->id, NULL);
00151 }
00152
00153 if (thread->is_thread_lock)
00154 (void) pthread_mutex_destroy(&(thread->lock));
00155
00156 _wakeup_close(thread);
00157
00158 while (thread->dcon_list != NULL)
00159 gearmand_con_free(thread->dcon_list);
00160
00161 while (thread->dcon_add_list != NULL)
00162 {
00163 dcon= thread->dcon_add_list;
00164 thread->dcon_add_list= dcon->next;
00165 close(dcon->fd);
00166 free(dcon);
00167 }
00168
00169 while (thread->free_dcon_list != NULL)
00170 {
00171 dcon= thread->free_dcon_list;
00172 thread->free_dcon_list= dcon->next;
00173 free(dcon);
00174 }
00175
00176 gearman_server_thread_free(&(thread->server_thread));
00177
00178 GEARMAN_LIST_DEL(thread->gearmand->thread, thread,)
00179
00180 if (thread->gearmand->threads > 0)
00181 {
00182 if (thread->base != NULL)
00183 event_base_free(thread->base);
00184
00185 gearmand_log_info(thread->gearmand, "Thread %u shutdown complete", thread->count);
00186 }
00187
00188 free(thread);
00189 }
00190
00191 void gearmand_thread_wakeup(gearmand_thread_st *thread,
00192 gearmand_wakeup_t wakeup)
00193 {
00194 uint8_t buffer= wakeup;
00195
00196
00197
00198 if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
00199 gearmand_log_error(thread->gearmand, "gearmand_thread_wakeup:write:%d", errno);
00200 }
00201
00202 void gearmand_thread_run(gearmand_thread_st *thread)
00203 {
00204 gearman_server_con_st *server_con;
00205 gearman_return_t ret;
00206 gearmand_con_st *dcon;
00207
00208 while (1)
00209 {
00210 server_con= gearman_server_thread_run(&(thread->server_thread), &ret);
00211 if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
00212 ret == GEARMAN_SHUTDOWN_GRACEFUL)
00213 {
00214 return;
00215 }
00216
00217 if (server_con == NULL)
00218 {
00219
00220
00221 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00222 return;
00223 }
00224
00225 dcon= (gearmand_con_st *)gearman_server_con_data(server_con);
00226
00227 gearmand_log_info(thread->gearmand, "[%4u] %15s:%5s Disconnected", thread->count, dcon->host, dcon->port);
00228
00229 gearmand_con_free(dcon);
00230 }
00231 }
00232
00233
00234
00235
00236
00237 static void *_thread(void *data)
00238 {
00239 gearmand_thread_st *thread= (gearmand_thread_st *)data;
00240
00241 gearmand_log_info(thread->gearmand, "[%4u] Entering thread event loop", thread->count);
00242
00243 if (event_base_loop(thread->base, 0) == -1)
00244 {
00245 gearmand_log_fatal(thread->gearmand, "_io_thread:event_base_loop:-1");
00246 thread->gearmand->ret= GEARMAN_EVENT;
00247 }
00248
00249 gearmand_log_info(thread->gearmand, "[%4u] Exiting thread event loop", thread->count);
00250
00251 return NULL;
00252 }
00253
00254 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00255 {
00256 gearmand_thread_st *dthread= (gearmand_thread_st *)context;
00257 char buffer[GEARMAN_MAX_ERROR_SIZE];
00258
00259 snprintf(buffer, GEARMAN_MAX_ERROR_SIZE, "[%4u] %s", dthread->count, line);
00260 (*dthread->gearmand->log_fn)(buffer, verbose,
00261 (void *)dthread->gearmand->log_context);
00262 }
00263
00264 static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
00265 void *fn_arg)
00266 {
00267 gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
00268 gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
00269 }
00270
00271 static gearman_return_t _wakeup_init(gearmand_thread_st *thread)
00272 {
00273 int ret;
00274
00275 gearmand_log_info(thread->gearmand, "Creating IO thread wakeup pipe");
00276
00277 ret= pipe(thread->wakeup_fd);
00278 if (ret == -1)
00279 {
00280 gearmand_log_fatal(thread->gearmand, "_wakeup_init:pipe:%d", errno);
00281 return GEARMAN_ERRNO;
00282 }
00283
00284 ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
00285 if (ret == -1)
00286 {
00287 gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno);
00288 return GEARMAN_ERRNO;
00289 }
00290
00291 ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00292 if (ret == -1)
00293 {
00294 gearmand_log_fatal(thread->gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno);
00295 return GEARMAN_ERRNO;
00296 }
00297
00298 event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
00299 _wakeup_event, thread);
00300 event_base_set(thread->base, &(thread->wakeup_event));
00301
00302 if (event_add(&(thread->wakeup_event), NULL) == -1)
00303 {
00304 gearmand_log_fatal(thread->gearmand, "_wakeup_init:event_add:-1");
00305 return GEARMAN_EVENT;
00306 }
00307
00308 thread->is_wakeup_event= true;
00309
00310 return GEARMAN_SUCCESS;
00311 }
00312
00313 static void _wakeup_close(gearmand_thread_st *thread)
00314 {
00315 _wakeup_clear(thread);
00316
00317 if (thread->wakeup_fd[0] >= 0)
00318 {
00319 gearmand_log_info(thread->gearmand, "Closing IO thread wakeup pipe");
00320 close(thread->wakeup_fd[0]);
00321 thread->wakeup_fd[0]= -1;
00322 close(thread->wakeup_fd[1]);
00323 thread->wakeup_fd[1]= -1;
00324 }
00325 }
00326
00327 static void _wakeup_clear(gearmand_thread_st *thread)
00328 {
00329 if (thread->is_wakeup_event)
00330 {
00331 gearmand_log_info(thread->gearmand, "[%4u] Clearing event for IO thread wakeup pipe", thread->count);
00332 int del_ret= event_del(&(thread->wakeup_event));
00333 assert(del_ret == 0);
00334 thread->is_wakeup_event= false;
00335 }
00336 }
00337
00338 static void _wakeup_event(int fd, short events __attribute__ ((unused)), void *arg)
00339 {
00340 gearmand_thread_st *thread= (gearmand_thread_st *)arg;
00341 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00342 ssize_t ret;
00343 ssize_t x;
00344
00345 while (1)
00346 {
00347 ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00348 if (ret == 0)
00349 {
00350 _clear_events(thread);
00351 gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:EOF");
00352 thread->gearmand->ret= GEARMAN_PIPE_EOF;
00353 return;
00354 }
00355 else if (ret == -1)
00356 {
00357 if (errno == EINTR)
00358 continue;
00359
00360 if (errno == EAGAIN)
00361 break;
00362
00363 _clear_events(thread);
00364 gearmand_log_fatal(thread->gearmand, "_wakeup_event:read:%d", errno);
00365 thread->gearmand->ret= GEARMAN_ERRNO;
00366 return;
00367 }
00368
00369 for (x= 0; x < ret; x++)
00370 {
00371 switch ((gearmand_wakeup_t)buffer[x])
00372 {
00373 case GEARMAND_WAKEUP_PAUSE:
00374 gearmand_log_info(thread->gearmand, "[%4u] Received PAUSE wakeup event", thread->count);
00375 break;
00376
00377 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00378 gearmand_log_info(thread->gearmand,
00379 "[%4u] Received SHUTDOWN_GRACEFUL wakeup event",
00380 thread->count);
00381 if (gearman_server_shutdown_graceful(&(thread->gearmand->server)) == GEARMAN_SHUTDOWN)
00382 {
00383 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00384 }
00385 break;
00386
00387 case GEARMAND_WAKEUP_SHUTDOWN:
00388 gearmand_log_info(thread->gearmand, "[%4u] Received SHUTDOWN wakeup event", thread->count);
00389 _clear_events(thread);
00390 break;
00391
00392 case GEARMAND_WAKEUP_CON:
00393 gearmand_log_info(thread->gearmand, "[%4u] Received CON wakeup event", thread->count);
00394 gearmand_con_check_queue(thread);
00395 break;
00396
00397 case GEARMAND_WAKEUP_RUN:
00398 gearmand_log_debug(thread->gearmand, "[%4u] Received RUN wakeup event", thread->count);
00399 gearmand_thread_run(thread);
00400 break;
00401
00402 default:
00403 gearmand_log_fatal(thread->gearmand, "[%4u] Received unknown wakeup event (%u)", thread->count, buffer[x]);
00404 _clear_events(thread);
00405 thread->gearmand->ret= GEARMAN_UNKNOWN_STATE;
00406 break;
00407 }
00408 }
00409 }
00410 }
00411
00412 static void _clear_events(gearmand_thread_st *thread)
00413 {
00414 _wakeup_clear(thread);
00415
00416 while (thread->dcon_list != NULL)
00417 gearmand_con_free(thread->dcon_list);
00418 }