Gearman Developer Documentation

libgearman-server/packet.c
Go to the documentation of this file.
00001 /* Gearman server and library
00002  * Copyright (C) 2008 Brian Aker, Eric Day
00003  * All rights reserved.
00004  *
00005  * Use and distribution licensed under the BSD license.  See
00006  * the COPYING file in the parent directory for full text.
00007  */
00008 
00014 #include "common.h"
00015 
00016 /*
00017  * Public definitions
00018  */
00019 
00020 gearman_server_packet_st *
00021 gearman_server_packet_create(gearman_server_thread_st *thread,
00022                              bool from_thread)
00023 {
00024   gearman_server_packet_st *server_packet= NULL;
00025 
00026   if (from_thread && thread->server->flags.threaded)
00027   {
00028     if (thread->free_packet_count > 0)
00029     {
00030       server_packet= thread->free_packet_list;
00031       thread->free_packet_list= server_packet->next;
00032       thread->free_packet_count--;
00033     }
00034   }
00035   else
00036   {
00037     if (thread->server->free_packet_count > 0)
00038     {
00039       server_packet= thread->server->free_packet_list;
00040       thread->server->free_packet_list= server_packet->next;
00041       thread->server->free_packet_count--;
00042     }
00043   }
00044 
00045   if (server_packet == NULL)
00046   {
00047     server_packet= (gearman_server_packet_st *)malloc(sizeof(gearman_server_packet_st));
00048     if (server_packet == NULL)
00049     {
00050       gearman_log_error(thread->gearman, "gearman_server_packet_create", "malloc");
00051       return NULL;
00052     }
00053   }
00054 
00055   server_packet->next= NULL;
00056 
00057   return server_packet;
00058 }
00059 
00060 void gearman_server_packet_free(gearman_server_packet_st *packet,
00061                                 gearman_server_thread_st *thread,
00062                                 bool from_thread)
00063 {
00064   if (from_thread && thread->server->flags.threaded)
00065   {
00066     if (thread->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
00067     {
00068       packet->next= thread->free_packet_list;
00069       thread->free_packet_list= packet;
00070       thread->free_packet_count++;
00071     }
00072     else
00073       free(packet);
00074   }
00075   else
00076   {
00077     if (thread->server->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
00078     {
00079       packet->next= thread->server->free_packet_list;
00080       thread->server->free_packet_list= packet;
00081       thread->server->free_packet_count++;
00082     }
00083     else
00084       free(packet);
00085   }
00086 }
00087 
00088 gearman_return_t gearman_server_io_packet_add(gearman_server_con_st *con,
00089                                               bool take_data,
00090                                               enum gearman_magic_t magic,
00091                                               gearman_command_t command,
00092                                               const void *arg, ...)
00093 {
00094   gearman_server_packet_st *server_packet;
00095   va_list ap;
00096   size_t arg_size;
00097   gearman_return_t ret;
00098 
00099   server_packet= gearman_server_packet_create(con->thread, false);
00100   if (server_packet == NULL)
00101     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00102 
00103   if (gearman_packet_create(con->thread->gearman,
00104                             &(server_packet->packet)) == NULL)
00105   {
00106     gearman_server_packet_free(server_packet, con->thread, false);
00107     return GEARMAN_MEMORY_ALLOCATION_FAILURE;
00108   }
00109 
00110   server_packet->packet.magic= magic;
00111   server_packet->packet.command= command;
00112 
00113   va_start(ap, arg);
00114 
00115   while (arg != NULL)
00116   {
00117     arg_size = va_arg(ap, size_t);
00118 
00119     ret= gearman_packet_create_arg(&(server_packet->packet), arg, arg_size);
00120     if (ret != GEARMAN_SUCCESS)
00121     {
00122       va_end(ap);
00123       gearman_packet_free(&(server_packet->packet));
00124       gearman_server_packet_free(server_packet, con->thread, false);
00125       return ret;
00126     }
00127 
00128     arg = va_arg(ap, void *);
00129   }
00130 
00131   va_end(ap);
00132 
00133   ret= gearman_packet_pack_header(&(server_packet->packet));
00134   if (ret != GEARMAN_SUCCESS)
00135   {
00136     gearman_packet_free(&(server_packet->packet));
00137     gearman_server_packet_free(server_packet, con->thread, false);
00138     return ret;
00139   }
00140 
00141   if (take_data)
00142     server_packet->packet.options.free_data= true;
00143 
00144   (void) pthread_mutex_lock(&con->thread->lock);
00145   GEARMAN_FIFO_ADD(con->io_packet, server_packet,);
00146   (void) pthread_mutex_unlock(&con->thread->lock);
00147 
00148   gearman_server_con_io_add(con);
00149 
00150   return GEARMAN_SUCCESS;
00151 }
00152 
00153 void gearman_server_io_packet_remove(gearman_server_con_st *con)
00154 {
00155   gearman_server_packet_st *server_packet= con->io_packet_list;
00156 
00157   gearman_packet_free(&(server_packet->packet));
00158 
00159   (void) pthread_mutex_lock(&con->thread->lock);
00160   GEARMAN_FIFO_DEL(con->io_packet, server_packet,);
00161   (void) pthread_mutex_unlock(&con->thread->lock);
00162 
00163   gearman_server_packet_free(server_packet, con->thread, true);
00164 }
00165 
00166 void gearman_server_proc_packet_add(gearman_server_con_st *con,
00167                                     gearman_server_packet_st *packet)
00168 {
00169   (void) pthread_mutex_lock(&con->thread->lock);
00170   GEARMAN_FIFO_ADD(con->proc_packet, packet,);
00171   (void) pthread_mutex_unlock(&con->thread->lock);
00172 
00173   gearman_server_con_proc_add(con);
00174 }
00175 
00176 gearman_server_packet_st *
00177 gearman_server_proc_packet_remove(gearman_server_con_st *con)
00178 {
00179   gearman_server_packet_st *server_packet= con->proc_packet_list;
00180 
00181   if (server_packet == NULL)
00182     return NULL;
00183 
00184   (void) pthread_mutex_lock(&con->thread->lock);
00185   GEARMAN_FIFO_DEL(con->proc_packet, server_packet,);
00186   (void) pthread_mutex_unlock(&con->thread->lock);
00187 
00188   return server_packet;
00189 }