OpenDNSSEC-signer  1.4.6
worker.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2009 NLNet Labs. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
14  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
17  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
19  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
21  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
22  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
23  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
32 #include "daemon/engine.h"
33 #include "daemon/worker.h"
34 #include "shared/allocator.h"
35 #include "shared/duration.h"
36 #include "shared/hsm.h"
37 #include "shared/locks.h"
38 #include "shared/log.h"
39 #include "shared/status.h"
40 #include "signer/tools.h"
41 #include "signer/zone.h"
42 
43 #include <time.h> /* time() */
44 
46  { WORKER_WORKER, "worker" },
47  { WORKER_DRUDGER, "drudger" },
48  { 0, NULL }
49 };
50 
51 
56 static const char*
57 worker2str(worker_id type)
58 {
59  ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
60  if (lt) {
61  return lt->name;
62  }
63  return NULL;
64 }
65 
66 
72 worker_create(allocator_type* allocator, int num, worker_id type)
73 {
74  worker_type* worker;
75  if (!allocator) {
76  return NULL;
77  }
78  worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
79  if (!worker) {
80  return NULL;
81  }
82  ods_log_debug("[%s[%i]] create", worker2str(type), num+1);
83  lock_basic_init(&worker->worker_lock);
84  lock_basic_set(&worker->worker_alarm);
85  lock_basic_lock(&worker->worker_lock);
86  worker->allocator = allocator;
87  worker->thread_num = num +1;
88  worker->engine = NULL;
89  worker->task = NULL;
90  worker->working_with = TASK_NONE;
91  worker->need_to_exit = 0;
92  worker->type = type;
93  worker->clock_in = 0;
94  worker->jobs_appointed = 0;
95  worker->jobs_completed = 0;
96  worker->jobs_failed = 0;
97  worker->sleeping = 0;
98  worker->waiting = 0;
100  return worker;
101 }
102 
103 
108 static void
109 worker_working_with(worker_type* worker, task_id with, task_id next,
110  const char* str, const char* name, task_id* what, time_t* when)
111 {
112  worker->working_with = with;
113  ods_log_verbose("[%s[%i]] %s zone %s", worker2str(worker->type),
114  worker->thread_num, str, name);
115  *what = next;
116  *when = time_now();
117  return;
118 }
119 
120 
125 static int
126 worker_fulfilled(worker_type* worker)
127 {
128  int ret = 0;
129  ret = (worker->jobs_completed + worker->jobs_failed) ==
130  worker->jobs_appointed;
131  return ret;
132 }
133 
134 
139 static void
140 worker_clear_jobs(worker_type* worker)
141 {
142  ods_log_assert(worker);
143  lock_basic_lock(&worker->worker_lock);
144  worker->jobs_appointed = 0;
145  worker->jobs_completed = 0;
146  worker->jobs_failed = 0;
147  lock_basic_unlock(&worker->worker_lock);
148  return;
149 }
150 
151 
156 static void
157 worker_queue_rrset(worker_type* worker, fifoq_type* q, rrset_type* rrset)
158 {
160  int tries = 0;
161  ods_log_assert(worker);
162  ods_log_assert(q);
163  ods_log_assert(rrset);
164 
165  lock_basic_lock(&q->q_lock);
166  status = fifoq_push(q, (void*) rrset, worker, &tries);
167  while (status == ODS_STATUS_UNCHANGED) {
168  tries++;
169  if (worker->need_to_exit) {
171  return;
172  }
179  lock_basic_sleep(&q->q_nonfull, &q->q_lock, 5);
180  status = fifoq_push(q, (void*) rrset, worker, &tries);
181  }
183 
184  ods_log_assert(status == ODS_STATUS_OK);
185  lock_basic_lock(&worker->worker_lock);
186  worker->jobs_appointed += 1;
187  lock_basic_unlock(&worker->worker_lock);
188  return;
189 }
190 
191 
196 static void
197 worker_queue_domain(worker_type* worker, fifoq_type* q, domain_type* domain)
198 {
199  rrset_type* rrset = NULL;
200  denial_type* denial = NULL;
201  ods_log_assert(worker);
202  ods_log_assert(q);
203  ods_log_assert(domain);
204  rrset = domain->rrsets;
205  while (rrset) {
206  worker_queue_rrset(worker, q, rrset);
207  rrset = rrset->next;
208  }
209  denial = (denial_type*) domain->denial;
210  if (denial && denial->rrset) {
211  worker_queue_rrset(worker, q, denial->rrset);
212  }
213  return;
214 }
215 
216 
221 static void
222 worker_queue_zone(worker_type* worker, fifoq_type* q, zone_type* zone)
223 {
224  ldns_rbnode_t* node = LDNS_RBTREE_NULL;
225  domain_type* domain = NULL;
226  ods_log_assert(worker);
227  ods_log_assert(q);
228  ods_log_assert(zone);
229  worker_clear_jobs(worker);
230  if (!zone->db || !zone->db->domains) {
231  return;
232  }
233  if (zone->db->domains->root != LDNS_RBTREE_NULL) {
234  node = ldns_rbtree_first(zone->db->domains);
235  }
236  while (node && node != LDNS_RBTREE_NULL) {
237  domain = (domain_type*) node->data;
238  worker_queue_domain(worker, q, domain);
239  node = ldns_rbtree_next(node);
240  }
241  return;
242 }
243 
244 
249 static ods_status
250 worker_check_jobs(worker_type* worker, task_type* task)
251 {
252  ods_log_assert(worker);
253  ods_log_assert(task);
254  lock_basic_lock(&worker->worker_lock);
255  if (worker->jobs_failed) {
256  ods_log_error("[%s[%i]] sign zone %s failed: %u RRsets failed",
257  worker2str(worker->type), worker->thread_num,
258  task_who2str(task), worker->jobs_failed);
259  lock_basic_unlock(&worker->worker_lock);
260  return ODS_STATUS_ERR;
261  } else if (worker->jobs_completed != worker->jobs_appointed) {
262  ods_log_error("[%s[%i]] sign zone %s failed: processed %u of %u "
263  "RRsets", worker2str(worker->type), worker->thread_num,
264  task_who2str(task), worker->jobs_completed,
265  worker->jobs_appointed);
266  lock_basic_unlock(&worker->worker_lock);
267  return ODS_STATUS_ERR;
268  } else if (worker->need_to_exit) {
269  ods_log_debug("[%s[%i]] sign zone %s failed: worker needs to exit",
270  worker2str(worker->type), worker->thread_num, task_who2str(task));
271  lock_basic_unlock(&worker->worker_lock);
272  return ODS_STATUS_ERR;
273  } else {
274  ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u RRsets "
275  "succeeded", worker2str(worker->type), worker->thread_num,
276  task_who2str(task), worker->jobs_completed,
277  worker->jobs_appointed);
278  ods_log_assert(worker->jobs_appointed == worker->jobs_completed);
279  }
280  lock_basic_unlock(&worker->worker_lock);
281  return ODS_STATUS_OK;
282 }
283 
284 
289 static void
290 worker_perform_task(worker_type* worker)
291 {
292  engine_type* engine = NULL;
293  zone_type* zone = NULL;
294  task_type* task = NULL;
295  task_id what = TASK_NONE;
296  time_t when = 0;
297  time_t never = (3600*24*365);
298  ods_status status = ODS_STATUS_OK;
299  int backup = 0;
300  time_t start = 0;
301  time_t end = 0;
302 
303  if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
304  return;
305  }
306  engine = (engine_type*) worker->engine;
307  task = (task_type*) worker->task;
308  zone = (zone_type*) worker->task->zone;
309  ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
310  worker2str(worker->type), worker->thread_num, task_what2str(task->what),
311  task_who2str(task), (uint32_t) worker->clock_in);
312  /* do what you have been told to do */
313  switch (task->what) {
314  case TASK_SIGNCONF:
315  /* perform 'load signconf' task */
316  worker_working_with(worker, TASK_SIGNCONF, TASK_READ,
317  "configure", task_who2str(task), &what, &when);
318  status = tools_signconf(zone);
319  if (status == ODS_STATUS_UNCHANGED) {
320  if (!zone->signconf->last_modified) {
321  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
322  worker2str(worker->type), worker->thread_num,
323  task_who2str(task));
324  status = ODS_STATUS_ERR;
325  }
326  }
327  if (status == ODS_STATUS_UNCHANGED) {
328  if (task->halted != TASK_NONE && task->halted != TASK_SIGNCONF) {
329  goto task_perform_continue;
330  }
331  status = ODS_STATUS_OK;
332  } else if (status == ODS_STATUS_OK) {
333  task->interrupt = TASK_NONE;
334  task->halted = TASK_NONE;
335  } else {
336  if (task->halted == TASK_NONE) {
337  goto task_perform_fail;
338  }
339  goto task_perform_continue;
340  }
341  /* break; */
342  case TASK_READ:
343  /* perform 'read input adapter' task */
344  worker_working_with(worker, TASK_READ, TASK_SIGN,
345  "read", task_who2str(task), &what, &when);
346  task->what = TASK_READ;
347  if (!zone->signconf->last_modified) {
348  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
349  worker2str(worker->type), worker->thread_num,
350  task_who2str(task));
351  status = ODS_STATUS_ERR;
352  } else {
353  lhsm_check_connection((void*)engine);
354  status = tools_input(zone);
355  }
356 
357  if (status == ODS_STATUS_UNCHANGED) {
358  ods_log_verbose("[%s[%i]] zone %s unsigned data not changed, "
359  "continue", worker2str(worker->type), worker->thread_num,
360  task_who2str(task));
361  status = ODS_STATUS_OK;
362  }
363  if (status == ODS_STATUS_OK) {
364  if (task->interrupt > TASK_SIGNCONF) {
365  task->interrupt = TASK_NONE;
366  task->halted = TASK_NONE;
367  }
368  } else {
369  if (task->halted == TASK_NONE) {
370  goto task_perform_fail;
371  }
372  goto task_perform_continue;
373  }
374  /* break; */
375  case TASK_SIGN:
376  /* perform 'sign' task */
377  worker_working_with(worker, TASK_SIGN, TASK_WRITE,
378  "sign", task_who2str(task), &what, &when);
379  task->what = TASK_SIGN;
380  status = zone_update_serial(zone);
381  if (status == ODS_STATUS_OK) {
382  if (task->interrupt > TASK_SIGNCONF) {
383  task->interrupt = TASK_NONE;
384  task->halted = TASK_NONE;
385  }
386  } else {
387  ods_log_error("[%s[%i]] unable to sign zone %s: "
388  "failed to increment serial",
389  worker2str(worker->type), worker->thread_num,
390  task_who2str(task));
391  if (task->halted == TASK_NONE) {
392  goto task_perform_fail;
393  }
394  goto task_perform_continue;
395  }
396 
397  /* start timer */
398  start = time(NULL);
399  if (zone->stats) {
401  if (!zone->stats->start_time) {
402  zone->stats->start_time = start;
403  }
404  zone->stats->sig_count = 0;
405  zone->stats->sig_soa_count = 0;
406  zone->stats->sig_reuse = 0;
407  zone->stats->sig_time = 0;
409  }
410  /* check the HSM connection before queuing sign operations */
411  lhsm_check_connection((void*)engine);
412  /* prepare keys */
413  status = zone_prepare_keys(zone);
414  if (status == ODS_STATUS_OK) {
415  /* queue menial, hard signing work */
416  worker_queue_zone(worker, engine->signq, zone);
417  ods_log_deeebug("[%s[%i]] wait until drudgers are finished "
418  "signing zone %s", worker2str(worker->type),
419  worker->thread_num, task_who2str(task));
420  /* sleep until work is done */
421  worker_sleep_unless(worker, 0);
422  }
423  /* stop timer */
424  end = time(NULL);
425  /* check status and jobs */
426  if (status == ODS_STATUS_OK) {
427  status = worker_check_jobs(worker, task);
428  }
429  worker_clear_jobs(worker);
430  if (status == ODS_STATUS_OK && zone->stats) {
432  zone->stats->sig_time = (end-start);
434  }
435  if (status != ODS_STATUS_OK) {
436  if (task->halted == TASK_NONE) {
437  goto task_perform_fail;
438  }
439  goto task_perform_continue;
440  } else {
441  if (task->interrupt > TASK_SIGNCONF) {
442  task->interrupt = TASK_NONE;
443  task->halted = TASK_NONE;
444  }
445  }
446  /* break; */
447  case TASK_WRITE:
448  /* perform 'write to output adapter' task */
449  worker_working_with(worker, TASK_WRITE, TASK_SIGN,
450  "write", task_who2str(task), &what, &when);
451  task->what = TASK_WRITE;
452  status = tools_output(zone, engine);
453  if (status == ODS_STATUS_OK) {
454  if (task->interrupt > TASK_SIGNCONF) {
455  task->interrupt = TASK_NONE;
456  task->halted = TASK_NONE;
457  }
458  } else {
459  /* clear signatures? */
460  if (task->halted == TASK_NONE) {
461  goto task_perform_fail;
462  }
463  goto task_perform_continue;
464  }
465  zone->db->is_processed = 1;
466  if (zone->signconf &&
468  what = TASK_SIGN;
469  when = worker->clock_in +
471  } else {
472  ods_log_error("[%s[%i]] unable to retrieve resign interval "
473  "for zone %s: duration2time() failed",
474  worker2str(worker->type), worker->thread_num,
475  task_who2str(task));
476  ods_log_info("[%s[%i]] defaulting to 1H resign interval for "
477  "zone %s", worker2str(worker->type), worker->thread_num,
478  task_who2str(task));
479  what = TASK_SIGN;
480  when = worker->clock_in + 3600;
481  }
482  backup = 1;
483  break;
484  case TASK_NONE:
485  worker->working_with = TASK_NONE;
486  /* no task */
487  ods_log_warning("[%s[%i]] none task for zone %s",
488  worker2str(worker->type), worker->thread_num,
489  task_who2str(task));
490  when = time_now() + never;
491  break;
492  default:
493  worker->working_with = TASK_NONE;
494  /* unknown task */
495  ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
496  worker2str(worker->type), worker->thread_num,
497  task_who2str(task));
498  what = TASK_SIGNCONF;
499  when = time_now();
500  break;
501  }
502  /* no error */
503  task->backoff = 0;
504  if (task->interrupt != TASK_NONE && task->interrupt != what) {
505  ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
506  worker2str(worker->type), worker->thread_num,
507  task_what2str(what), task_who2str(task));
508  task->halted = what;
509  task->halted_when = when;
510  task->what = task->interrupt;
511  task->when = time_now();
512  } else {
513  ods_log_debug("[%s[%i]] next task %s for zone %s",
514  worker2str(worker->type), worker->thread_num,
515  task_what2str(what), task_who2str(task));
516  task->what = what;
517  task->when = when;
518  task->interrupt = TASK_NONE;
519  task->halted = TASK_NONE;
520  task->halted_when = 0;
521  }
522  /* backup the last successful run */
523  if (backup) {
524  status = zone_backup2(zone);
525  if (status != ODS_STATUS_OK) {
526  ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
527  worker2str(worker->type), worker->thread_num,
528  task_who2str(task), ods_status2str(status));
529  /* just a warning */
530  status = ODS_STATUS_OK;
531  }
532  backup = 0;
533  }
534  return;
535 
536 task_perform_fail:
537  if (status != ODS_STATUS_XFR_NOT_READY) {
538  /* other statuses is critical, and we know it is not ODS_STATUS_OK */
539  ods_log_crit("[%s[%i]] CRITICAL: failed to sign zone %s: %s",
540  worker2str(worker->type), worker->thread_num,
541  task_who2str(task), ods_status2str(status));
542  }
543  /* in case of failure, also mark zone processed (for single run usage) */
544  zone->db->is_processed = 1;
545  if (task->backoff) {
546  task->backoff *= 2;
547  } else {
548  task->backoff = 60;
549  }
550  if (task->backoff > ODS_SE_MAX_BACKOFF) {
551  task->backoff = ODS_SE_MAX_BACKOFF;
552  }
553  ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
554  worker2str(worker->type), worker->thread_num,
555  task_what2str(task->what), task_who2str(task), task->backoff);
556  task->when = time_now() + task->backoff;
557  return;
558 
559 task_perform_continue:
560  ods_log_info("[%s[%i]] continue task %s for zone %s",
561  worker2str(worker->type), worker->thread_num,
562  task_what2str(task->halted), task_who2str(task));
563  task->what = task->halted;
564  task->when = task->halted_when;
565  task->interrupt = TASK_NONE;
566  task->halted = TASK_NONE;
567  task->halted_when = 0;
568  return;
569 }
570 
571 
576 static void
577 worker_work(worker_type* worker)
578 {
579  time_t now = 0;
580  time_t timeout = 1;
581  engine_type* engine = NULL;
582  zone_type* zone = NULL;
583  ods_status status = ODS_STATUS_OK;
584 
585  ods_log_assert(worker);
586  ods_log_assert(worker->type == WORKER_WORKER);
587 
588  engine = (engine_type*) worker->engine;
589  while (worker->need_to_exit == 0) {
590  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
591  worker->thread_num);
592  now = time_now();
594  worker->task = schedule_pop_task(engine->taskq);
595  if (worker->task) {
596  worker->working_with = worker->task->what;
598  zone = (zone_type*) worker->task->zone;
599 
600  lock_basic_lock(&zone->zone_lock);
601  ods_log_debug("[%s[%i]] start working on zone %s",
602  worker2str(worker->type), worker->thread_num, zone->name);
603  worker->clock_in = time(NULL);
604  worker_perform_task(worker);
605  zone->task = worker->task;
606  ods_log_debug("[%s[%i]] finished working on zone %s",
607  worker2str(worker->type), worker->thread_num, zone->name);
608 
610  worker->task = NULL;
611  worker->working_with = TASK_NONE;
612  status = schedule_task(engine->taskq, zone->task, 1);
613  if (status != ODS_STATUS_OK) {
614  ods_log_error("[%s[%i]] unable to schedule task for zone %s: "
615  "%s", worker2str(worker->type), worker->thread_num,
616  zone->name, ods_status2str(status));
617  }
620  timeout = 1;
622  lock_basic_lock(&engine->signal_lock);
623  if (engine->need_to_reload) {
624  lock_basic_alarm(&engine->signal_cond);
625  }
626  lock_basic_unlock(&engine->signal_lock);
627 
628  } else {
629  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
630  worker->thread_num);
631  worker->task = schedule_get_first_task(engine->taskq);
633  if (worker->task && !engine->taskq->loading) {
634  timeout = (worker->task->when - now);
635  } else {
636  timeout *= 2;
637  }
638  if (timeout > ODS_SE_MAX_BACKOFF) {
639  timeout = ODS_SE_MAX_BACKOFF;
640  }
641  worker->task = NULL;
642  worker_sleep(worker, timeout);
643  }
644  }
645  return;
646 }
647 
648 
653 static void
654 worker_drudge(worker_type* worker)
655 {
656  engine_type* engine = NULL;
657  zone_type* zone = NULL;
658  task_type* task = NULL;
659  rrset_type* rrset = NULL;
660  ods_status status = ODS_STATUS_OK;
661  worker_type* superior = NULL;
662  hsm_ctx_t* ctx = NULL;
663 
664  ods_log_assert(worker);
665  ods_log_assert(worker->engine);
666  ods_log_assert(worker->type == WORKER_DRUDGER);
667 
668  engine = (engine_type*) worker->engine;
669  while (worker->need_to_exit == 0) {
670  ods_log_deeebug("[%s[%i]] report for duty", worker2str(worker->type),
671  worker->thread_num);
672  /* initialize */
673  superior = NULL;
674  zone = NULL;
675  task = NULL;
676  /* get item */
677  lock_basic_lock(&engine->signq->q_lock);
678  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
679  if (!rrset) {
680  ods_log_deeebug("[%s[%i]] nothing to do, wait",
681  worker2str(worker->type), worker->thread_num);
689  &engine->signq->q_lock, 0);
690  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
691  }
692  lock_basic_unlock(&engine->signq->q_lock);
693  /* do some work */
694  if (rrset) {
695  ods_log_assert(superior);
696  if (!ctx) {
697  ods_log_debug("[%s[%i]] create hsm context",
698  worker2str(worker->type), worker->thread_num);
699  ctx = hsm_create_context();
700  }
701  if (!ctx) {
702  ods_log_crit("[%s[%i]] error creating libhsm context",
703  worker2str(worker->type), worker->thread_num);
704  engine->need_to_reload = 1;
705  lock_basic_lock(&superior->worker_lock);
706  superior->jobs_failed++;
707  lock_basic_unlock(&superior->worker_lock);
708  } else {
709  ods_log_assert(ctx);
710  lock_basic_lock(&superior->worker_lock);
711  task = superior->task;
712  ods_log_assert(task);
713  zone = task->zone;
714  lock_basic_unlock(&superior->worker_lock);
715  ods_log_assert(zone);
716  ods_log_assert(zone->apex);
717  ods_log_assert(zone->signconf);
718  worker->clock_in = time(NULL);
719  status = rrset_sign(ctx, rrset, superior->clock_in);
720  lock_basic_lock(&superior->worker_lock);
721  if (status == ODS_STATUS_OK) {
722  superior->jobs_completed++;
723  } else {
724  superior->jobs_failed++;
725  }
726  lock_basic_unlock(&superior->worker_lock);
727  }
728  if (worker_fulfilled(superior) && superior->sleeping) {
729  ods_log_deeebug("[%s[%i]] wake up superior[%u], work is "
730  "done", worker2str(worker->type), worker->thread_num,
731  superior->thread_num);
732  worker_wakeup(superior);
733  }
734  superior = NULL;
735  rrset = NULL;
736  }
737  /* done work */
738  }
739  /* wake up superior */
740  if (superior && superior->sleeping) {
741  ods_log_deeebug("[%s[%i]] wake up superior[%u], i am exiting",
742  worker2str(worker->type), worker->thread_num, superior->thread_num);
743  worker_wakeup(superior);
744  }
745  /* cleanup open HSM sessions */
746  if (ctx) {
747  hsm_destroy_context(ctx);
748  }
749  return;
750 }
751 
752 
757 void
759 {
760  ods_log_assert(worker);
761  switch (worker->type) {
762  case WORKER_DRUDGER:
763  worker_drudge(worker);
764  break;
765  case WORKER_WORKER:
766  worker_work(worker);
767  break;
768  default:
769  ods_log_error("[worker] illegal worker (id=%i)", worker->type);
770  break;
771  }
772  return;
773 }
774 
775 
780 void
781 worker_sleep(worker_type* worker, time_t timeout)
782 {
783  ods_log_assert(worker);
784  if (!worker->need_to_exit) {
785  lock_basic_lock(&worker->worker_lock);
786  worker->sleeping = 1;
787  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
788  timeout);
789  lock_basic_unlock(&worker->worker_lock);
790  }
791  return;
792 }
793 
794 
799 void
800 worker_sleep_unless(worker_type* worker, time_t timeout)
801 {
802  ods_log_assert(worker);
803  lock_basic_lock(&worker->worker_lock);
804  while (!worker->need_to_exit && !worker_fulfilled(worker)) {
805  worker->sleeping = 1;
806  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
807  timeout);
808  ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
809  "appointed, %u completed, %u failed", worker2str(worker->type),
810  worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
811  worker->jobs_failed);
812  }
813  lock_basic_unlock(&worker->worker_lock);
814  return;
815 }
816 
817 
822 void
824 {
825  ods_log_assert(worker);
826  if (worker && worker->sleeping && !worker->waiting) {
827  ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
828  worker->thread_num);
829  lock_basic_lock(&worker->worker_lock);
830  lock_basic_alarm(&worker->worker_alarm);
831  worker->sleeping = 0;
832  lock_basic_unlock(&worker->worker_lock);
833  }
834  return;
835 }
836 
837 
842 void
843 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition,
844  time_t timeout)
845 {
846  lock_basic_lock(lock);
847  lock_basic_sleep(condition, lock, timeout);
848  lock_basic_unlock(lock);
849  return;
850 }
851 
852 
857 void
858 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
859 {
860  worker_wait_timeout(lock, condition, 0);
861  return;
862 }
863 
864 
869 void
870 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
871 {
872  lock_basic_lock(lock);
873  lock_basic_alarm(condition);
874  lock_basic_unlock(lock);
875  return;
876 }
877 
878 
883 void
884 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
885 {
886  lock_basic_lock(lock);
887  lock_basic_broadcast(condition);
888  lock_basic_unlock(lock);
889  return;
890 }
891 
892 
897 void
899 {
900  allocator_type* allocator;
901  cond_basic_type worker_cond;
902  lock_basic_type worker_lock;
903  if (!worker) {
904  return;
905  }
906  allocator = worker->allocator;
907  worker_cond = worker->worker_alarm;
908  worker_lock = worker->worker_lock;
909  allocator_deallocate(allocator, (void*) worker);
910  lock_basic_destroy(&worker_lock);
911  lock_basic_off(&worker_cond);
912  return;
913 }
#define lock_basic_off(cond)
Definition: locks.h:98
Definition: task.h:41
rrset_type * rrset
Definition: denial.h:54
size_t jobs_completed
Definition: worker.h:59
unsigned waiting
Definition: worker.h:64
unsigned need_to_exit
Definition: worker.h:65
task_type * schedule_get_first_task(schedule_type *schedule)
Definition: schedule.c:245
void ods_log_debug(const char *format,...)
Definition: log.c:270
time_t when
Definition: task.h:59
size_t jobs_appointed
Definition: worker.h:58
lock_basic_type worker_lock
Definition: worker.h:62
#define lock_basic_destroy(lock)
Definition: locks.h:90
cond_basic_type signal_cond
Definition: engine.h:78
cond_basic_type q_threshold
Definition: fifoq.h:66
task_id interrupt
Definition: task.h:57
void * allocator_alloc(allocator_type *allocator, size_t size)
Definition: allocator.c:66
void lhsm_check_connection(void *engine)
Definition: hsm.c:112
ods_status tools_signconf(zone_type *zone)
Definition: tools.c:52
lock_basic_type q_lock
Definition: fifoq.h:65
time_t sig_time
Definition: stats.h:63
void * engine
Definition: worker.h:53
ods_status schedule_task(schedule_type *schedule, task_type *task, int log)
Definition: schedule.c:146
void ods_log_info(const char *format,...)
Definition: log.c:302
const char * task_who2str(task_type *task)
Definition: task.c:176
enum ods_enum_status ods_status
Definition: status.h:90
void worker_start(worker_type *worker)
Definition: worker.c:758
lock_basic_type zone_lock
Definition: zone.h:95
time_t backoff
Definition: task.h:61
void ods_log_error(const char *format,...)
Definition: log.c:334
lock_basic_type stats_lock
Definition: stats.h:67
const char * ods_status2str(ods_status status)
Definition: status.c:111
ldns_rbtree_t * domains
Definition: namedb.h:49
int32_t sig_reuse
Definition: stats.h:62
Definition: task.h:45
void * zone
Definition: task.h:63
ods_lookup_table * ods_lookup_by_id(ods_lookup_table *table, int id)
Definition: status.c:94
rrset_type * next
Definition: rrset.h:73
void worker_cleanup(worker_type *worker)
Definition: worker.c:898
#define lock_basic_set(cond)
Definition: locks.h:94
enum task_id_enum task_id
Definition: task.h:48
ods_status fifoq_push(fifoq_type *q, void *item, worker_type *worker, int *tries)
Definition: fifoq.c:119
void ods_log_crit(const char *format,...)
Definition: log.c:350
int32_t sig_count
Definition: stats.h:60
lock_basic_type signal_lock
Definition: engine.h:79
size_t jobs_failed
Definition: worker.h:60
void worker_wait(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:858
ods_status tools_input(zone_type *zone)
Definition: tools.c:93
task_type * task
Definition: worker.h:54
#define lock_basic_lock(lock)
Definition: locks.h:91
namedb_type * db
Definition: zone.h:86
Definition: task.h:43
void worker_sleep(worker_type *worker, time_t timeout)
Definition: worker.c:781
#define lock_basic_sleep(cond, lock, sleep)
Definition: locks.h:95
time_t halted_when
Definition: task.h:60
time_t clock_in
Definition: worker.h:57
int lock_basic_type
Definition: locks.h:88
task_type * schedule_pop_task(schedule_type *schedule)
Definition: schedule.c:285
unsigned is_processed
Definition: namedb.h:56
ods_status tools_output(zone_type *zone, engine_type *engine)
Definition: tools.c:181
void worker_notify_all(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:884
signconf_type * signconf
Definition: zone.h:84
ods_status zone_backup2(zone_type *zone)
Definition: zone.c:1033
time_t start_time
Definition: stats.h:65
ods_status zone_update_serial(zone_type *zone)
Definition: zone.c:469
task_id halted
Definition: task.h:58
void worker_wakeup(worker_type *worker)
Definition: worker.c:823
enum worker_enum worker_id
Definition: worker.h:46
void worker_sleep_unless(worker_type *worker, time_t timeout)
Definition: worker.c:800
time_t duration2time(duration_type *duration)
Definition: duration.c:371
void ods_log_verbose(const char *format,...)
Definition: log.c:286
time_t last_modified
Definition: signconf.h:78
const char * name
Definition: status.h:95
task_id what
Definition: task.h:56
#define lock_basic_init(lock)
Definition: locks.h:89
int thread_num
Definition: worker.h:51
const char * name
Definition: zone.h:76
schedule_type * taskq
Definition: engine.h:61
ods_status zone_prepare_keys(zone_type *zone)
Definition: zone.c:428
duration_type * sig_resign_interval
Definition: signconf.h:55
cond_basic_type worker_alarm
Definition: worker.h:61
void ods_log_deeebug(const char *format,...)
Definition: log.c:254
worker_type * worker_create(allocator_type *allocator, int num, worker_id type)
Definition: worker.c:72
ods_status rrset_sign(hsm_ctx_t *ctx, rrset_type *rrset, time_t signtime)
Definition: rrset.c:662
void allocator_deallocate(allocator_type *allocator, void *data)
Definition: allocator.c:135
unsigned sleeping
Definition: worker.h:63
task_id working_with
Definition: worker.h:55
lock_basic_type schedule_lock
Definition: schedule.h:63
worker_id type
Definition: worker.h:56
void worker_wait_timeout(lock_basic_type *lock, cond_basic_type *condition, time_t timeout)
Definition: worker.c:843
rrset_type * rrsets
Definition: domain.h:60
void * task
Definition: zone.h:92
fifoq_type * signq
Definition: engine.h:62
cond_basic_type q_nonfull
Definition: fifoq.h:67
#define ods_log_assert(x)
Definition: log.h:154
int need_to_reload
Definition: engine.h:75
int32_t sig_soa_count
Definition: stats.h:61
void * denial
Definition: domain.h:56
#define lock_basic_alarm(cond)
Definition: locks.h:96
#define lock_basic_unlock(lock)
Definition: locks.h:92
void ods_log_warning(const char *format,...)
Definition: log.c:318
allocator_type * allocator
Definition: worker.h:50
ldns_rdf * apex
Definition: zone.h:68
const char * task_what2str(task_id what)
Definition: task.c:146
void worker_notify(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:870
time_t time_now(void)
Definition: duration.c:513
ods_lookup_table worker_str[]
Definition: worker.c:45
stats_type * stats
Definition: zone.h:94
#define lock_basic_broadcast(cond)
Definition: locks.h:97
void * fifoq_pop(fifoq_type *q, worker_type **worker)
Definition: fifoq.c:89