Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00020 gearman_server_packet_st *
00021 gearman_server_packet_create(gearman_server_thread_st *thread,
00022 bool from_thread)
00023 {
00024 gearman_server_packet_st *server_packet= NULL;
00025
00026 if (from_thread && thread->server->flags.threaded)
00027 {
00028 if (thread->free_packet_count > 0)
00029 {
00030 server_packet= thread->free_packet_list;
00031 thread->free_packet_list= server_packet->next;
00032 thread->free_packet_count--;
00033 }
00034 }
00035 else
00036 {
00037 if (thread->server->free_packet_count > 0)
00038 {
00039 server_packet= thread->server->free_packet_list;
00040 thread->server->free_packet_list= server_packet->next;
00041 thread->server->free_packet_count--;
00042 }
00043 }
00044
00045 if (server_packet == NULL)
00046 {
00047 server_packet= (gearman_server_packet_st *)malloc(sizeof(gearman_server_packet_st));
00048 if (server_packet == NULL)
00049 {
00050 gearman_log_error(thread->gearman, "gearman_server_packet_create", "malloc");
00051 return NULL;
00052 }
00053 }
00054
00055 server_packet->next= NULL;
00056
00057 return server_packet;
00058 }
00059
00060 void gearman_server_packet_free(gearman_server_packet_st *packet,
00061 gearman_server_thread_st *thread,
00062 bool from_thread)
00063 {
00064 if (from_thread && thread->server->flags.threaded)
00065 {
00066 if (thread->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
00067 {
00068 packet->next= thread->free_packet_list;
00069 thread->free_packet_list= packet;
00070 thread->free_packet_count++;
00071 }
00072 else
00073 free(packet);
00074 }
00075 else
00076 {
00077 if (thread->server->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
00078 {
00079 packet->next= thread->server->free_packet_list;
00080 thread->server->free_packet_list= packet;
00081 thread->server->free_packet_count++;
00082 }
00083 else
00084 free(packet);
00085 }
00086 }
00087
00088 gearman_return_t gearman_server_io_packet_add(gearman_server_con_st *con,
00089 bool take_data,
00090 enum gearman_magic_t magic,
00091 gearman_command_t command,
00092 const void *arg, ...)
00093 {
00094 gearman_server_packet_st *server_packet;
00095 va_list ap;
00096 size_t arg_size;
00097 gearman_return_t ret;
00098
00099 server_packet= gearman_server_packet_create(con->thread, false);
00100 if (server_packet == NULL)
00101 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00102
00103 if (gearman_packet_create(con->thread->gearman,
00104 &(server_packet->packet)) == NULL)
00105 {
00106 gearman_server_packet_free(server_packet, con->thread, false);
00107 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00108 }
00109
00110 server_packet->packet.magic= magic;
00111 server_packet->packet.command= command;
00112
00113 va_start(ap, arg);
00114
00115 while (arg != NULL)
00116 {
00117 arg_size = va_arg(ap, size_t);
00118
00119 ret= gearman_packet_create_arg(&(server_packet->packet), arg, arg_size);
00120 if (ret != GEARMAN_SUCCESS)
00121 {
00122 va_end(ap);
00123 gearman_packet_free(&(server_packet->packet));
00124 gearman_server_packet_free(server_packet, con->thread, false);
00125 return ret;
00126 }
00127
00128 arg = va_arg(ap, void *);
00129 }
00130
00131 va_end(ap);
00132
00133 ret= gearman_packet_pack_header(&(server_packet->packet));
00134 if (ret != GEARMAN_SUCCESS)
00135 {
00136 gearman_packet_free(&(server_packet->packet));
00137 gearman_server_packet_free(server_packet, con->thread, false);
00138 return ret;
00139 }
00140
00141 if (take_data)
00142 server_packet->packet.options.free_data= true;
00143
00144 (void) pthread_mutex_lock(&con->thread->lock);
00145 GEARMAN_FIFO_ADD(con->io_packet, server_packet,);
00146 (void) pthread_mutex_unlock(&con->thread->lock);
00147
00148 gearman_server_con_io_add(con);
00149
00150 return GEARMAN_SUCCESS;
00151 }
00152
00153 void gearman_server_io_packet_remove(gearman_server_con_st *con)
00154 {
00155 gearman_server_packet_st *server_packet= con->io_packet_list;
00156
00157 gearman_packet_free(&(server_packet->packet));
00158
00159 (void) pthread_mutex_lock(&con->thread->lock);
00160 GEARMAN_FIFO_DEL(con->io_packet, server_packet,);
00161 (void) pthread_mutex_unlock(&con->thread->lock);
00162
00163 gearman_server_packet_free(server_packet, con->thread, true);
00164 }
00165
00166 void gearman_server_proc_packet_add(gearman_server_con_st *con,
00167 gearman_server_packet_st *packet)
00168 {
00169 (void) pthread_mutex_lock(&con->thread->lock);
00170 GEARMAN_FIFO_ADD(con->proc_packet, packet,);
00171 (void) pthread_mutex_unlock(&con->thread->lock);
00172
00173 gearman_server_con_proc_add(con);
00174 }
00175
00176 gearman_server_packet_st *
00177 gearman_server_proc_packet_remove(gearman_server_con_st *con)
00178 {
00179 gearman_server_packet_st *server_packet= con->proc_packet_list;
00180
00181 if (server_packet == NULL)
00182 return NULL;
00183
00184 (void) pthread_mutex_lock(&con->thread->lock);
00185 GEARMAN_FIFO_DEL(con->proc_packet, server_packet,);
00186 (void) pthread_mutex_unlock(&con->thread->lock);
00187
00188 return server_packet;
00189 }