00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015 #include "gearmand.h"
00016
00017
00018
00019
00020
00027 static void _con_ready(int fd, short events, void *arg);
00028
00029 static gearman_return_t _con_add(gearmand_thread_st *thread,
00030 gearmand_con_st *con);
00031
00034
00035
00036
00037
00038 gearman_return_t gearmand_con_create(gearmand_st *gearmand, int fd,
00039 const char *host, const char *port,
00040 gearman_con_add_fn *add_fn)
00041 {
00042 gearmand_con_st *dcon;
00043 gearmand_con_st *free_dcon_list;
00044 uint32_t free_dcon_count;
00045
00046 if (gearmand->free_dcon_count > 0)
00047 {
00048 dcon= gearmand->free_dcon_list;
00049 GEARMAN_LIST_DEL(gearmand->free_dcon, dcon,)
00050 }
00051 else
00052 {
00053 dcon= malloc(sizeof(gearmand_con_st));
00054 if (dcon == NULL)
00055 {
00056 close(fd);
00057 GEARMAN_FATAL(gearmand, "gearmand_con_create:malloc")
00058 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00059 }
00060 }
00061
00062 dcon->last_events= 0;
00063 dcon->fd= fd;
00064 dcon->next= NULL;
00065 dcon->prev= NULL;
00066 dcon->server_con= NULL;
00067 dcon->con= NULL;
00068 dcon->add_fn= NULL;
00069 strncpy(dcon->host, host, NI_MAXHOST - 1);
00070 strncpy(dcon->port, port, NI_MAXSERV - 1);
00071 dcon->add_fn= add_fn;
00072
00073
00074 if (gearmand->threads == 0)
00075 {
00076 dcon->thread= gearmand->thread_list;
00077 return _con_add(gearmand->thread_list, dcon);
00078 }
00079
00080
00081 if (gearmand->thread_add_next == NULL)
00082 gearmand->thread_add_next= gearmand->thread_list;
00083
00084 dcon->thread= gearmand->thread_add_next;
00085
00086
00087 if (dcon->thread->dcon_add_count == 0 &&
00088 dcon->thread->free_dcon_count < gearmand->max_thread_free_dcon_count)
00089 {
00090 GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
00091 gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
00092 }
00093 else
00094 {
00095 (void ) pthread_mutex_lock(&(dcon->thread->lock));
00096
00097 GEARMAN_LIST_ADD(dcon->thread->dcon_add, dcon,)
00098
00099
00100 free_dcon_list= dcon->thread->free_dcon_list;
00101 free_dcon_count= dcon->thread->free_dcon_count;
00102 dcon->thread->free_dcon_list= NULL;
00103 dcon->thread->free_dcon_count= 0;
00104
00105 (void ) pthread_mutex_unlock(&(dcon->thread->lock));
00106
00107
00108
00109
00110 if (dcon->thread->dcon_add_count == 1)
00111 gearmand_thread_wakeup(dcon->thread, GEARMAND_WAKEUP_CON);
00112
00113
00114 while (free_dcon_list != NULL)
00115 {
00116 dcon= free_dcon_list;
00117 GEARMAN_LIST_DEL(free_dcon, dcon,)
00118 GEARMAN_LIST_ADD(gearmand->free_dcon, dcon,)
00119 }
00120 }
00121
00122 gearmand->thread_add_next= gearmand->thread_add_next->next;
00123
00124 return GEARMAN_SUCCESS;
00125 }
00126
00127 void gearmand_con_free(gearmand_con_st *dcon)
00128 {
00129 assert(event_del(&(dcon->event)) == 0);
00130
00131
00132 event_set(&(dcon->event), dcon->fd, EV_READ, _con_ready, dcon);
00133 event_base_set(dcon->thread->base, &(dcon->event));
00134 event_add(&(dcon->event), NULL);
00135 assert(event_del(&(dcon->event)) == 0);
00136
00137 gearman_server_con_free(dcon->server_con);
00138 GEARMAN_LIST_DEL(dcon->thread->dcon, dcon,)
00139
00140 close(dcon->fd);
00141
00142 if (dcon->thread->gearmand->free_dcon_count < GEARMAN_MAX_FREE_SERVER_CON)
00143 {
00144 if (dcon->thread->gearmand->threads == 0)
00145 GEARMAN_LIST_ADD(dcon->thread->gearmand->free_dcon, dcon,)
00146 else
00147 {
00148
00149 (void ) pthread_mutex_lock(&(dcon->thread->lock));
00150 GEARMAN_LIST_ADD(dcon->thread->free_dcon, dcon,)
00151 (void ) pthread_mutex_unlock(&(dcon->thread->lock));
00152 }
00153 }
00154 else
00155 free(dcon);
00156 }
00157
00158 void gearmand_con_check_queue(gearmand_thread_st *thread)
00159 {
00160 gearmand_con_st *dcon;
00161
00162
00163 if (thread->dcon_add_count == 0)
00164 return;
00165
00166
00167
00168 while (thread->dcon_add_list != NULL)
00169 {
00170 (void ) pthread_mutex_lock(&(thread->lock));
00171 dcon= thread->dcon_add_list;
00172 GEARMAN_LIST_DEL(thread->dcon_add, dcon,)
00173 (void ) pthread_mutex_unlock(&(thread->lock));
00174
00175 if (_con_add(thread, dcon) != GEARMAN_SUCCESS)
00176 gearmand_wakeup(thread->gearmand, GEARMAND_WAKEUP_SHUTDOWN);
00177 }
00178 }
00179
00180 gearman_return_t gearmand_con_watch(gearman_con_st *con, short events,
00181 void *arg __attribute__ ((unused)))
00182 {
00183 (void) arg;
00184 gearmand_con_st *dcon;
00185 short set_events= 0;
00186
00187 dcon= (gearmand_con_st *)gearman_con_context(con);
00188 dcon->con= con;
00189
00190 if (events & POLLIN)
00191 set_events|= EV_READ;
00192 if (events & POLLOUT)
00193 set_events|= EV_WRITE;
00194
00195 if (dcon->last_events != set_events)
00196 {
00197 if (dcon->last_events != 0)
00198 assert(event_del(&(dcon->event)) == 0);
00199 event_set(&(dcon->event), dcon->fd, set_events | EV_PERSIST, _con_ready,
00200 dcon);
00201 event_base_set(dcon->thread->base, &(dcon->event));
00202
00203 if (event_add(&(dcon->event), NULL) == -1)
00204 {
00205 GEARMAN_FATAL(dcon->thread->gearmand, "_con_watch:event_add:-1")
00206 return GEARMAN_EVENT;
00207 }
00208
00209 dcon->last_events= set_events;
00210 }
00211
00212 GEARMAN_CRAZY(dcon->thread->gearmand, "[%4u] %15s:%5s Watching %6s %s",
00213 dcon->thread->count, dcon->host, dcon->port,
00214 events & POLLIN ? "POLLIN" : "",
00215 events & POLLOUT ? "POLLOUT" : "")
00216
00217 return GEARMAN_SUCCESS;
00218 }
00219
00220
00221
00222
00223
00224 static void _con_ready(int fd __attribute__ ((unused)), short events,
00225 void *arg)
00226 {
00227 gearmand_con_st *dcon= (gearmand_con_st *)arg;
00228 short revents= 0;
00229 gearman_return_t ret;
00230
00231 if (events & EV_READ)
00232 revents|= POLLIN;
00233 if (events & EV_WRITE)
00234 revents|= POLLOUT;
00235
00236 ret= gearman_con_set_revents(dcon->con, revents);
00237 if (ret != GEARMAN_SUCCESS)
00238 {
00239 gearmand_con_free(dcon);
00240 return;
00241 }
00242
00243 GEARMAN_CRAZY(dcon->thread->gearmand, "[%4u] %15s:%5s Ready %6s %s",
00244 dcon->thread->count, dcon->host, dcon->port,
00245 revents & POLLIN ? "POLLIN" : "",
00246 revents & POLLOUT ? "POLLOUT" : "")
00247
00248 gearmand_thread_run(dcon->thread);
00249 }
00250
00251 static gearman_return_t _con_add(gearmand_thread_st *thread,
00252 gearmand_con_st *dcon)
00253 {
00254 gearman_return_t ret;
00255
00256 dcon->server_con= gearman_server_con_add(&(thread->server_thread), dcon->fd,
00257 dcon);
00258 if (dcon->server_con == NULL)
00259 {
00260 close(dcon->fd);
00261 free(dcon);
00262 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00263 }
00264
00265 gearman_server_con_set_host(dcon->server_con, dcon->host);
00266 gearman_server_con_set_port(dcon->server_con, dcon->port);
00267
00268 if (dcon->add_fn != NULL)
00269 {
00270 ret= (*dcon->add_fn)(gearman_server_con_con(dcon->server_con));
00271 if (ret != GEARMAN_SUCCESS)
00272 {
00273 gearman_server_con_free(dcon->server_con);
00274 close(dcon->fd);
00275 free(dcon);
00276 return ret;
00277 }
00278 }
00279
00280 GEARMAN_INFO(thread->gearmand, "[%4u] %15s:%5s Connected", thread->count,
00281 dcon->host, dcon->port)
00282
00283 GEARMAN_LIST_ADD(thread->dcon, dcon,)
00284
00285 return GEARMAN_SUCCESS;
00286 }