00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void *_thread(void *data);
00028 static void _log(const char *line, gearman_verbose_t verbose, void *context);
00029 static void _run(gearman_server_thread_st *thread, void *fn_arg);
00030
00031 static gearman_return_t _wakeup_init(gearmand_thread_st *thread);
00032 static void _wakeup_close(gearmand_thread_st *thread);
00033 static void _wakeup_clear(gearmand_thread_st *thread);
00034 static void _wakeup_event(int fd, short events, void *arg);
00035 static void _clear_events(gearmand_thread_st *thread);
00036
00039
00040
00041
00042
00043 gearman_return_t gearmand_thread_create(gearmand_st *gearmand)
00044 {
00045 gearmand_thread_st *thread;
00046 gearman_return_t ret;
00047 int pthread_ret;
00048
00049 thread= malloc(sizeof(gearmand_thread_st));
00050 if (thread == NULL)
00051 {
00052 GEARMAN_FATAL(gearmand, "gearmand_thread_create:malloc")
00053 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00054 }
00055
00056 if (gearman_server_thread_create(&(gearmand->server),
00057 &(thread->server_thread)) == NULL)
00058 {
00059 free(thread);
00060 GEARMAN_FATAL(gearmand,
00061 "gearmand_thread_create:gearman_server_thread_create:NULL")
00062 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00063 }
00064
00065 gearman_server_thread_set_log_fn(&(thread->server_thread), _log, thread,
00066 gearmand->verbose);
00067 gearman_server_thread_set_event_watch(&(thread->server_thread),
00068 gearmand_con_watch, NULL);
00069
00070 thread->options= 0;
00071 thread->count= 0;
00072 thread->dcon_count= 0;
00073 thread->dcon_add_count= 0;
00074 thread->free_dcon_count= 0;
00075 thread->wakeup_fd[0]= -1;
00076 thread->wakeup_fd[1]= -1;
00077 GEARMAN_LIST_ADD(gearmand->thread, thread,)
00078 thread->gearmand= gearmand;
00079 thread->dcon_list= NULL;
00080 thread->dcon_add_list= NULL;
00081 thread->free_dcon_list= NULL;
00082
00083
00084
00085 if (gearmand->threads == 0)
00086 thread->base= gearmand->base;
00087 else
00088 {
00089 GEARMAN_INFO(gearmand, "Initializing libevent for IO thread")
00090
00091 thread->base= event_base_new();
00092 if (thread->base == NULL)
00093 {
00094 gearmand_thread_free(thread);
00095 GEARMAN_FATAL(gearmand, "gearmand_thread_create:event_base_new:NULL")
00096 return GEARMAN_EVENT;
00097 }
00098 }
00099
00100 ret= _wakeup_init(thread);
00101 if (ret != GEARMAN_SUCCESS)
00102 {
00103 gearmand_thread_free(thread);
00104 return ret;
00105 }
00106
00107
00108 if (gearmand->threads == 0)
00109 return GEARMAN_SUCCESS;
00110
00111 thread->count= gearmand->thread_count;
00112
00113 pthread_ret= pthread_mutex_init(&(thread->lock), NULL);
00114 if (pthread_ret != 0)
00115 {
00116 thread->count= 0;
00117 gearmand_thread_free(thread);
00118 GEARMAN_FATAL(gearmand, "gearmand_thread_create:pthread_mutex_init:%d",
00119 pthread_ret)
00120 return GEARMAN_PTHREAD;
00121 }
00122
00123 thread->options|= GEARMAND_THREAD_LOCK;
00124
00125 gearman_server_thread_set_run(&(thread->server_thread), _run, thread);
00126
00127 pthread_ret= pthread_create(&(thread->id), NULL, _thread, thread);
00128 if (pthread_ret != 0)
00129 {
00130 thread->count= 0;
00131 gearmand_thread_free(thread);
00132 GEARMAN_FATAL(gearmand, "gearmand_thread_create:pthread_create:%d",
00133 pthread_ret)
00134 return GEARMAN_PTHREAD;
00135 }
00136
00137 GEARMAN_INFO(gearmand, "Thread %u created", thread->count)
00138
00139 return GEARMAN_SUCCESS;
00140 }
00141
00142 void gearmand_thread_free(gearmand_thread_st *thread)
00143 {
00144 gearmand_con_st *dcon;
00145
00146 if (thread->gearmand->threads && thread->count > 0)
00147 {
00148 GEARMAN_INFO(thread->gearmand, "Shutting down thread %u", thread->count)
00149
00150 gearmand_thread_wakeup(thread, GEARMAND_WAKEUP_SHUTDOWN);
00151 (void) pthread_join(thread->id, NULL);
00152 }
00153
00154 if (thread->options & GEARMAND_THREAD_LOCK)
00155 (void) pthread_mutex_destroy(&(thread->lock));
00156
00157 _wakeup_close(thread);
00158
00159 while (thread->dcon_list != NULL)
00160 gearmand_con_free(thread->dcon_list);
00161
00162 while (thread->dcon_add_list != NULL)
00163 {
00164 dcon= thread->dcon_add_list;
00165 thread->dcon_add_list= dcon->next;
00166 close(dcon->fd);
00167 free(dcon);
00168 }
00169
00170 while (thread->free_dcon_list != NULL)
00171 {
00172 dcon= thread->free_dcon_list;
00173 thread->free_dcon_list= dcon->next;
00174 free(dcon);
00175 }
00176
00177 gearman_server_thread_free(&(thread->server_thread));
00178
00179 GEARMAN_LIST_DEL(thread->gearmand->thread, thread,)
00180
00181 if (thread->gearmand->threads > 0)
00182 {
00183 if (thread->base != NULL)
00184 event_base_free(thread->base);
00185
00186 GEARMAN_INFO(thread->gearmand, "Thread %u shutdown complete", thread->count)
00187 }
00188
00189 free(thread);
00190 }
00191
00192 void gearmand_thread_wakeup(gearmand_thread_st *thread,
00193 gearmand_wakeup_t wakeup)
00194 {
00195 uint8_t buffer= wakeup;
00196
00197
00198
00199 if (write(thread->wakeup_fd[1], &buffer, 1) != 1)
00200 GEARMAN_ERROR(thread->gearmand, "gearmand_thread_wakeup:write:%d", errno)
00201 }
00202
00203 void gearmand_thread_run(gearmand_thread_st *thread)
00204 {
00205 gearman_server_con_st *server_con;
00206 gearman_return_t ret;
00207 gearmand_con_st *dcon;
00208
00209 while (1)
00210 {
00211 server_con= gearman_server_thread_run(&(thread->server_thread), &ret);
00212 if (ret == GEARMAN_SUCCESS || ret == GEARMAN_IO_WAIT ||
00213 ret == GEARMAN_SHUTDOWN_GRACEFUL)
00214 {
00215 return;
00216 }
00217
00218 if (server_con == NULL)
00219 {
00220
00221
00222 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00223 return;
00224 }
00225
00226 dcon= (gearmand_con_st *)gearman_server_con_data(server_con);
00227
00228 GEARMAN_INFO(thread->gearmand, "[%4u] %15s:%5s Disconnected", thread->count,
00229 dcon->host, dcon->port)
00230
00231 gearmand_con_free(dcon);
00232 }
00233 }
00234
00235
00236
00237
00238
00239 static void *_thread(void *data)
00240 {
00241 gearmand_thread_st *thread= (gearmand_thread_st *)data;
00242
00243 GEARMAN_INFO(thread->gearmand, "[%4u] Entering thread event loop",
00244 thread->count)
00245
00246 if (event_base_loop(thread->base, 0) == -1)
00247 {
00248 GEARMAN_FATAL(thread->gearmand, "_io_thread:event_base_loop:-1")
00249 thread->gearmand->ret= GEARMAN_EVENT;
00250 }
00251
00252 GEARMAN_INFO(thread->gearmand, "[%4u] Exiting thread event loop",
00253 thread->count)
00254
00255 return NULL;
00256 }
00257
00258 static void _log(const char *line, gearman_verbose_t verbose, void *context)
00259 {
00260 gearmand_thread_st *dthread= (gearmand_thread_st *)context;
00261 char buffer[GEARMAN_MAX_ERROR_SIZE];
00262
00263 snprintf(buffer, GEARMAN_MAX_ERROR_SIZE, "[%4u] %s", dthread->count, line);
00264 (*dthread->gearmand->log_fn)(buffer, verbose,
00265 (void *)dthread->gearmand->log_context);
00266 }
00267
00268 static void _run(gearman_server_thread_st *thread __attribute__ ((unused)),
00269 void *fn_arg)
00270 {
00271 gearmand_thread_st *dthread= (gearmand_thread_st *)fn_arg;
00272 gearmand_thread_wakeup(dthread, GEARMAND_WAKEUP_RUN);
00273 }
00274
00275 static gearman_return_t _wakeup_init(gearmand_thread_st *thread)
00276 {
00277 int ret;
00278
00279 GEARMAN_INFO(thread->gearmand, "Creating IO thread wakeup pipe")
00280
00281 ret= pipe(thread->wakeup_fd);
00282 if (ret == -1)
00283 {
00284 GEARMAN_FATAL(thread->gearmand, "_wakeup_init:pipe:%d", errno)
00285 return GEARMAN_ERRNO;
00286 }
00287
00288 ret= fcntl(thread->wakeup_fd[0], F_GETFL, 0);
00289 if (ret == -1)
00290 {
00291 GEARMAN_FATAL(thread->gearmand, "_wakeup_init:fcntl:F_GETFL:%d", errno)
00292 return GEARMAN_ERRNO;
00293 }
00294
00295 ret= fcntl(thread->wakeup_fd[0], F_SETFL, ret | O_NONBLOCK);
00296 if (ret == -1)
00297 {
00298 GEARMAN_FATAL(thread->gearmand, "_wakeup_init:fcntl:F_SETFL:%d", errno)
00299 return GEARMAN_ERRNO;
00300 }
00301
00302 event_set(&(thread->wakeup_event), thread->wakeup_fd[0], EV_READ | EV_PERSIST,
00303 _wakeup_event, thread);
00304 event_base_set(thread->base, &(thread->wakeup_event));
00305
00306 if (event_add(&(thread->wakeup_event), NULL) == -1)
00307 {
00308 GEARMAN_FATAL(thread->gearmand, "_wakeup_init:event_add:-1")
00309 return GEARMAN_EVENT;
00310 }
00311
00312 thread->options|= GEARMAND_THREAD_WAKEUP_EVENT;
00313
00314 return GEARMAN_SUCCESS;
00315 }
00316
00317 static void _wakeup_close(gearmand_thread_st *thread)
00318 {
00319 _wakeup_clear(thread);
00320
00321 if (thread->wakeup_fd[0] >= 0)
00322 {
00323 GEARMAN_INFO(thread->gearmand, "Closing IO thread wakeup pipe")
00324 close(thread->wakeup_fd[0]);
00325 thread->wakeup_fd[0]= -1;
00326 close(thread->wakeup_fd[1]);
00327 thread->wakeup_fd[1]= -1;
00328 }
00329 }
00330
00331 static void _wakeup_clear(gearmand_thread_st *thread)
00332 {
00333 if (thread->options & GEARMAND_THREAD_WAKEUP_EVENT)
00334 {
00335 GEARMAN_INFO(thread->gearmand,
00336 "[%4u] Clearing event for IO thread wakeup pipe",
00337 thread->count)
00338 assert(event_del(&(thread->wakeup_event)) == 0);
00339 thread->options&= (gearmand_thread_options_t)~GEARMAND_THREAD_WAKEUP_EVENT;
00340 }
00341 }
00342
00343 static void _wakeup_event(int fd, short events __attribute__ ((unused)),
00344 void *arg)
00345 {
00346 gearmand_thread_st *thread= (gearmand_thread_st *)arg;
00347 uint8_t buffer[GEARMAN_PIPE_BUFFER_SIZE];
00348 ssize_t ret;
00349 ssize_t x;
00350
00351 while (1)
00352 {
00353 ret= read(fd, buffer, GEARMAN_PIPE_BUFFER_SIZE);
00354 if (ret == 0)
00355 {
00356 _clear_events(thread);
00357 GEARMAN_FATAL(thread->gearmand, "_wakeup_event:read:EOF")
00358 thread->gearmand->ret= GEARMAN_PIPE_EOF;
00359 return;
00360 }
00361 else if (ret == -1)
00362 {
00363 if (errno == EINTR)
00364 continue;
00365
00366 if (errno == EAGAIN)
00367 break;
00368
00369 _clear_events(thread);
00370 GEARMAN_FATAL(thread->gearmand, "_wakeup_event:read:%d", errno)
00371 thread->gearmand->ret= GEARMAN_ERRNO;
00372 return;
00373 }
00374
00375 for (x= 0; x < ret; x++)
00376 {
00377 switch ((gearmand_wakeup_t)buffer[x])
00378 {
00379 case GEARMAND_WAKEUP_PAUSE:
00380 GEARMAN_INFO(thread->gearmand, "[%4u] Received PAUSE wakeup event",
00381 thread->count)
00382 break;
00383
00384 case GEARMAND_WAKEUP_SHUTDOWN_GRACEFUL:
00385 GEARMAN_INFO(thread->gearmand,
00386 "[%4u] Received SHUTDOWN_GRACEFUL wakeup event",
00387 thread->count)
00388 if (gearman_server_shutdown_graceful(&(thread->gearmand->server)) ==
00389 GEARMAN_SHUTDOWN)
00390 {
00391 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00392 }
00393 break;
00394
00395 case GEARMAND_WAKEUP_SHUTDOWN:
00396 GEARMAN_INFO(thread->gearmand, "[%4u] Received SHUTDOWN wakeup event",
00397 thread->count)
00398 _clear_events(thread);
00399 break;
00400
00401 case GEARMAND_WAKEUP_CON:
00402 GEARMAN_INFO(thread->gearmand, "[%4u] Received CON wakeup event",
00403 thread->count)
00404 gearmand_con_check_queue(thread);
00405 break;
00406
00407 case GEARMAND_WAKEUP_RUN:
00408 GEARMAN_DEBUG(thread->gearmand, "[%4u] Received RUN wakeup event",
00409 thread->count)
00410 gearmand_thread_run(thread);
00411 break;
00412
00413 default:
00414 GEARMAN_FATAL(thread->gearmand,
00415 "[%4u] Received unknown wakeup event (%u)", thread->count,
00416 buffer[x])
00417 _clear_events(thread);
00418 thread->gearmand->ret= GEARMAN_UNKNOWN_STATE;
00419 break;
00420 }
00421 }
00422 }
00423 }
00424
00425 static void _clear_events(gearmand_thread_st *thread)
00426 {
00427 _wakeup_clear(thread);
00428
00429 while (thread->dcon_list != NULL)
00430 gearmand_con_free(thread->dcon_list);
00431 }