1
2
3 """On-demand threaded queue manager.
4
5 Worker threads are spawned based on demand at the time a message is added to the queue."""
6
7
8 import copy
9 import logging
10 import math
11
12 from Queue import Queue, Empty
13 from threading import Event, Thread
14
15 from turbomail.api import Manager
16 from turbomail.exceptions import TransportExhaustedException
17 from turbomail.control import interface
18
19
20 __all__ = ['load']
21
22 log = logging.getLogger("turbomail.manager")
23
24
25
28
29
31 name = "demand"
32
34 log.info("Demand manager starting up.")
35 super(DemandManager, self).__init__()
36
37 self.pool = 0
38 self.queue = Queue()
39 self.finished = Event()
40
41 self.threads = interface.config.get("mail.demand.threads", 4)
42 self.divisor = interface.config.get("mail.demand.divisor", 10)
43 self.timeout = interface.config.get("mail.demand.timeout", 60)
44
45 log.info("Demand manager ready.")
46
48 log.info("Demand manager shutting down.")
49 self.finished.set()
50
52 thread = Thread(target=self.wrapper)
53 thread.start()
54 self.pool += 1
55
57 log.info("Adding message %s to the queue for background delivery." % message.id)
58 self.queue.put(copy.deepcopy(message))
59 message._processed = True
60 message._dirty = True
61
62 if not self.queue.empty() and self.pool < self.optimum:
63 tospawn = int(self.optimum - self.pool)
64 log.debug("Spawning %d thread%s." % (tospawn, tospawn != 1 and "s" or ""))
65 for i in range(tospawn):
66 self.spawn()
67
68 return True
69
71 log.debug("Mail queue worker starting up.")
72
73 try:
74 self.worker()
75 except:
76 log.exception("Internal error in worker thread.")
77
78 self.pool -= 1
79 log.debug("Mail queue worker finished.")
80
107
109 return min(self.threads, math.ceil(self.queue.qsize() / float(self.divisor)))
110
111 optimum = property(optimum)
112