00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00020 gearman_server_packet_st *
00021 gearman_server_packet_create(gearman_server_thread_st *thread,
00022 bool from_thread)
00023 {
00024 gearman_server_packet_st *server_packet= NULL;
00025
00026 if (from_thread && thread->server->options & GEARMAN_SERVER_PROC_THREAD)
00027 {
00028 if (thread->free_packet_count > 0)
00029 {
00030 server_packet= thread->free_packet_list;
00031 thread->free_packet_list= server_packet->next;
00032 thread->free_packet_count--;
00033 }
00034 }
00035 else
00036 {
00037 if (thread->server->free_packet_count > 0)
00038 {
00039 server_packet= thread->server->free_packet_list;
00040 thread->server->free_packet_list= server_packet->next;
00041 thread->server->free_packet_count--;
00042 }
00043 }
00044
00045 if (server_packet == NULL)
00046 {
00047 server_packet= malloc(sizeof(gearman_server_packet_st));
00048 if (server_packet == NULL)
00049 {
00050 GEARMAN_ERROR_SET(thread->gearman, "gearman_server_packet_create",
00051 "malloc")
00052 return NULL;
00053 }
00054 }
00055
00056 server_packet->next= NULL;
00057
00058 return server_packet;
00059 }
00060
00061 void gearman_server_packet_free(gearman_server_packet_st *packet,
00062 gearman_server_thread_st *thread,
00063 bool from_thread)
00064 {
00065 if (from_thread && thread->server->options & GEARMAN_SERVER_PROC_THREAD)
00066 {
00067 if (thread->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
00068 {
00069 packet->next= thread->free_packet_list;
00070 thread->free_packet_list= packet;
00071 thread->free_packet_count++;
00072 }
00073 else
00074 free(packet);
00075 }
00076 else
00077 {
00078 if (thread->server->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
00079 {
00080 packet->next= thread->server->free_packet_list;
00081 thread->server->free_packet_list= packet;
00082 thread->server->free_packet_count++;
00083 }
00084 else
00085 free(packet);
00086 }
00087 }
00088
00089 gearman_return_t gearman_server_io_packet_add(gearman_server_con_st *con,
00090 bool take_data,
00091 gearman_magic_t magic,
00092 gearman_command_t command,
00093 const void *arg, ...)
00094 {
00095 gearman_server_packet_st *server_packet;
00096 va_list ap;
00097 size_t arg_size;
00098 gearman_return_t ret;
00099
00100 server_packet= gearman_server_packet_create(con->thread, false);
00101 if (server_packet == NULL)
00102 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00103
00104 if (gearman_packet_create(con->thread->gearman,
00105 &(server_packet->packet)) == NULL)
00106 {
00107 gearman_server_packet_free(server_packet, con->thread, false);
00108 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00109 }
00110
00111 server_packet->packet.magic= magic;
00112 server_packet->packet.command= command;
00113
00114 va_start(ap, arg);
00115
00116 while (arg != NULL)
00117 {
00118 arg_size = va_arg(ap, size_t);
00119
00120 ret= gearman_packet_add_arg(&(server_packet->packet), arg, arg_size);
00121 if (ret != GEARMAN_SUCCESS)
00122 {
00123 va_end(ap);
00124 gearman_packet_free(&(server_packet->packet));
00125 gearman_server_packet_free(server_packet, con->thread, false);
00126 return ret;
00127 }
00128
00129 arg = va_arg(ap, void *);
00130 }
00131
00132 va_end(ap);
00133
00134 ret= gearman_packet_pack_header(&(server_packet->packet));
00135 if (ret != GEARMAN_SUCCESS)
00136 {
00137 gearman_packet_free(&(server_packet->packet));
00138 gearman_server_packet_free(server_packet, con->thread, false);
00139 return ret;
00140 }
00141
00142 if (take_data)
00143 server_packet->packet.options|= GEARMAN_PACKET_FREE_DATA;
00144
00145 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00146 GEARMAN_FIFO_ADD(con->io_packet, server_packet,)
00147 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00148
00149 gearman_server_con_io_add(con);
00150
00151 return GEARMAN_SUCCESS;
00152 }
00153
00154 void gearman_server_io_packet_remove(gearman_server_con_st *con)
00155 {
00156 gearman_server_packet_st *server_packet= con->io_packet_list;
00157
00158 gearman_packet_free(&(server_packet->packet));
00159
00160 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00161 GEARMAN_FIFO_DEL(con->io_packet, server_packet,)
00162 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00163
00164 gearman_server_packet_free(server_packet, con->thread, true);
00165 }
00166
00167 void gearman_server_proc_packet_add(gearman_server_con_st *con,
00168 gearman_server_packet_st *packet)
00169 {
00170 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00171 GEARMAN_FIFO_ADD(con->proc_packet, packet,)
00172 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00173
00174 gearman_server_con_proc_add(con);
00175 }
00176
00177 gearman_server_packet_st *
00178 gearman_server_proc_packet_remove(gearman_server_con_st *con)
00179 {
00180 gearman_server_packet_st *server_packet= con->proc_packet_list;
00181
00182 if (server_packet == NULL)
00183 return NULL;
00184
00185 GEARMAN_SERVER_THREAD_LOCK(con->thread)
00186 GEARMAN_FIFO_DEL(con->proc_packet, server_packet,)
00187 GEARMAN_SERVER_THREAD_UNLOCK(con->thread)
00188
00189 return server_packet;
00190 }