00001
00002
00003
00004
00005
00006
00007
00008
00014 #include "common.h"
00015
00016
00017
00018
00019
00029 static gearman_return_t _job_send(gearman_job_st *job);
00030
00033
00034
00035
00036
00037 gearman_return_t gearman_job_send_data(gearman_job_st *job, const void *data,
00038 size_t data_size)
00039 {
00040 gearman_return_t ret;
00041
00042 if (!(job->options & GEARMAN_JOB_WORK_IN_USE))
00043 {
00044 ret= gearman_packet_add(job->worker->gearman, &(job->work),
00045 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_WORK_DATA,
00046 job->assigned.arg[0], job->assigned.arg_size[0],
00047 data, data_size, NULL);
00048 if (ret != GEARMAN_SUCCESS)
00049 return ret;
00050
00051 job->options|= GEARMAN_JOB_WORK_IN_USE;
00052 }
00053
00054 return _job_send(job);
00055 }
00056
00057 gearman_return_t gearman_job_send_warning(gearman_job_st *job,
00058 const void *warning,
00059 size_t warning_size)
00060 {
00061 gearman_return_t ret;
00062
00063 if (!(job->options & GEARMAN_JOB_WORK_IN_USE))
00064 {
00065 ret= gearman_packet_add(job->worker->gearman, &(job->work),
00066 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_WORK_WARNING,
00067 job->assigned.arg[0], job->assigned.arg_size[0],
00068 warning, warning_size, NULL);
00069 if (ret != GEARMAN_SUCCESS)
00070 return ret;
00071
00072 job->options|= GEARMAN_JOB_WORK_IN_USE;
00073 }
00074
00075 return _job_send(job);
00076 }
00077
00078 gearman_return_t gearman_job_send_status(gearman_job_st *job,
00079 uint32_t numerator,
00080 uint32_t denominator)
00081 {
00082 gearman_return_t ret;
00083 char numerator_string[12];
00084 char denominator_string[12];
00085
00086 if (!(job->options & GEARMAN_JOB_WORK_IN_USE))
00087 {
00088 snprintf(numerator_string, 12, "%u", numerator);
00089 snprintf(denominator_string, 12, "%u", denominator);
00090
00091 ret= gearman_packet_add(job->worker->gearman, &(job->work),
00092 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_WORK_STATUS,
00093 job->assigned.arg[0], job->assigned.arg_size[0],
00094 numerator_string, strlen(numerator_string) + 1,
00095 denominator_string, strlen(denominator_string),
00096 NULL);
00097 if (ret != GEARMAN_SUCCESS)
00098 return ret;
00099
00100 job->options|= GEARMAN_JOB_WORK_IN_USE;
00101 }
00102
00103 return _job_send(job);
00104 }
00105
00106 gearman_return_t gearman_job_send_complete(gearman_job_st *job,
00107 const void *result,
00108 size_t result_size)
00109 {
00110 gearman_return_t ret;
00111
00112 if (job->options & GEARMAN_JOB_FINISHED)
00113 return GEARMAN_SUCCESS;
00114
00115 if (!(job->options & GEARMAN_JOB_WORK_IN_USE))
00116 {
00117 ret= gearman_packet_add(job->worker->gearman, &(job->work),
00118 GEARMAN_MAGIC_REQUEST,
00119 GEARMAN_COMMAND_WORK_COMPLETE,
00120 job->assigned.arg[0], job->assigned.arg_size[0],
00121 result, result_size, NULL);
00122 if (ret != GEARMAN_SUCCESS)
00123 return ret;
00124
00125 job->options|= GEARMAN_JOB_WORK_IN_USE;
00126 }
00127
00128 ret= _job_send(job);
00129 if (ret != GEARMAN_SUCCESS)
00130 return ret;
00131
00132 job->options|= GEARMAN_JOB_FINISHED;
00133 return GEARMAN_SUCCESS;
00134 }
00135
00136 gearman_return_t gearman_job_send_exception(gearman_job_st *job,
00137 const void *exception,
00138 size_t exception_size)
00139 {
00140 gearman_return_t ret;
00141
00142 if (!(job->options & GEARMAN_JOB_WORK_IN_USE))
00143 {
00144 ret= gearman_packet_add(job->worker->gearman, &(job->work),
00145 GEARMAN_MAGIC_REQUEST,
00146 GEARMAN_COMMAND_WORK_EXCEPTION,
00147 job->assigned.arg[0], job->assigned.arg_size[0],
00148 exception, exception_size, NULL);
00149 if (ret != GEARMAN_SUCCESS)
00150 return ret;
00151
00152 job->options|= GEARMAN_JOB_WORK_IN_USE;
00153 }
00154
00155 return _job_send(job);
00156 }
00157
00158 gearman_return_t gearman_job_send_fail(gearman_job_st *job)
00159 {
00160 gearman_return_t ret;
00161
00162 if (job->options & GEARMAN_JOB_FINISHED)
00163 return GEARMAN_SUCCESS;
00164
00165 if (!(job->options & GEARMAN_JOB_WORK_IN_USE))
00166 {
00167 ret= gearman_packet_add(job->worker->gearman, &(job->work),
00168 GEARMAN_MAGIC_REQUEST, GEARMAN_COMMAND_WORK_FAIL,
00169 job->assigned.arg[0], job->assigned.arg_size[0] - 1,
00170 NULL);
00171 if (ret != GEARMAN_SUCCESS)
00172 return ret;
00173
00174 job->options|= GEARMAN_JOB_WORK_IN_USE;
00175 }
00176
00177 ret= _job_send(job);
00178 if (ret != GEARMAN_SUCCESS)
00179 return ret;
00180
00181 job->options|= GEARMAN_JOB_FINISHED;
00182 return GEARMAN_SUCCESS;
00183 }
00184
00185 char *gearman_job_handle(const gearman_job_st *job)
00186 {
00187 return (char *)job->assigned.arg[0];
00188 }
00189
00190 char *gearman_job_function_name(const gearman_job_st *job)
00191 {
00192 return (char *)job->assigned.arg[1];
00193 }
00194
00195 const char *gearman_job_unique(const gearman_job_st *job)
00196 {
00197 if (job->assigned.command == GEARMAN_COMMAND_JOB_ASSIGN_UNIQ)
00198 return (const char *)job->assigned.arg[2];
00199 return "";
00200 }
00201
00202 const void *gearman_job_workload(const gearman_job_st *job)
00203 {
00204 return job->assigned.data;
00205 }
00206
00207 size_t gearman_job_workload_size(const gearman_job_st *job)
00208 {
00209 return job->assigned.data_size;
00210 }
00211
00212 void *gearman_job_take_workload(gearman_job_st *job, size_t *data_size)
00213 {
00214 return gearman_packet_take_data(&(job->assigned), data_size);
00215 }
00216
00217
00218
00219
00220
00221 static gearman_return_t _job_send(gearman_job_st *job)
00222 {
00223 gearman_return_t ret;
00224
00225 ret= gearman_con_send(job->con, &(job->work), true);
00226 if (ret != GEARMAN_SUCCESS)
00227 return ret;
00228
00229 gearman_packet_free(&(job->work));
00230 job->options&= (gearman_job_options_t)~GEARMAN_JOB_WORK_IN_USE;
00231
00232 return GEARMAN_SUCCESS;
00233 }