1
2
3 """Generic Pool and MailPool class definitions."""
4
5 import logging
6 log = logging.getLogger("turbomail.pool")
7
8 import math
9 from Queue import Queue, Empty
10 from threading import Event, Thread
11 from turbomail.dispatch import Dispatch
12
13
14 __all__ = ['Pool', 'MailPool']
15
16
18 """A threadpool which checks regularily for new jobs and spawns processes
19 as needed.
20
21 Do not use this class directly. Always subclass and override the worker
22 method.
23 """
24
25 - def __init__(self, interval=10, threads=4, jobs=10, timeout=60, polling=False, **kw):
26 """Initialize the threadpool.
27
28 @param interval: A delay, in seconds, between spawn runs.
29 @type interval: int
30
31 @param threads: The maximum number of concurrent threads.
32 @type threads: int
33
34 @param jobs: The maximum number of jobs a single thread is
35 allowed to handle before dying of old age.
36 @type jobs: int
37
38 @param timeout: The amount of time, in seconds, a thread is
39 allowed to sit idle before dying of starvation.
40 @type timeout: int
41
42 @param polling: Enable or disable the periodic polling
43 mechanism. Disabled, threads will be created,
44 as required, when work is enqueued.
45 @type polling: bool
46 """
47
48 super(Pool, self).__init__()
49
50 self._pool = 0
51 self._queue = Queue()
52 self._finished = Event()
53 self._interval = interval
54 self._threads = threads
55 self._jobs = jobs
56 self._timeout = timeout
57 self._polling = False
58 self._kw = kw
59
60 log.debug("Thread pool created.")
61
62 - def enqueue(self, work, block=True, timeout=None):
63 """Enqueue a Message instance.
64
65 @param work: The unit of work can be any callable that returns a
66 three-item tuple containing the sender and
67 recipient addresses and a properly formatted MIME
68 message, in that order. The preferred type is an
69 instance of the Message class or subclass.
70 @type work: callable
71
72 @param block: Block code execution until there is a free slot in
73 the queue. If I{block} is True and I{timeout} is
74 None, block indefinately.
75 @type block: bool
76
77 @param timeout: How long to block execution (in seconds). If
78 the timeout expires (or block is false and the
79 queue is full) raise the Full exception.
80 @type timeout: int
81 """
82
83 if callable(work):
84 self._queue.put(work(), block=block, timeout=timeout)
85 else:
86 self._queue.put(work, block=block, timeout=timeout)
87
88 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs)))
89
90 if not self._polling and not self._queue.empty() and self._pool < optimum_threads:
91 log.debug("Work enqueued. Spawning %d threads." % (optimum_threads - self._pool))
92 for i in range(int(optimum_threads - self._pool)):
93 self.spawn()
94
95 else:
96 log.debug("Work enqueued.")
97
98
100 """Quit the management thread and shutdown the queue."""
101
102 log.debug("Shutdown requested.")
103 self._finished.set()
104
106 thread = Thread(target=self.wrapper)
107 thread.start()
108 self._pool += 1
109
111 """The management thread.
112
113 Do not call directly. Instead, use the I{start} method.
114 """
115
116 log.debug("Beginning thread pool main loop.")
117
118 while True:
119 if self._finished.isSet():
120 log.debug("Shutdown request acknowledged.")
121 break
122
123 if not self._queue.empty():
124 log.debug("Estimate %d work units in the queue." % self._queue.qsize())
125
126 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs)))
127
128 if not self._queue.empty() and self._pool < optimum_threads:
129 log.debug("Creating %d threads." % (optimum_threads - self._pool))
130 for i in range(int(optimum_threads - self._pool)):
131 self.spawn()
132
133 self._finished.wait(self._interval)
134
135 log.debug("Thread pool main loop has ended.")
136
138 """Thread wrapper to log and keep track of the active thread count."""
139
140 log.debug("Thread pool worker starting up.")
141
142 self.worker()
143
144 self._pool -= 1
145 log.debug("Thread pool worker finished.")
146
148 """This method must be overridden in a subclass and is used to
149 perform the work of the threadpool.
150
151 Will raise a NotImplementedError exception if not subclassed."""
152
153 raise NotImplementedError
154
155
157 """Mail delivery threadpool.
158
159 This class delivers messages from a queue using the Dispatch class.
160
161 Example usage::
162
163 import turbomail
164 pool = turbomail.MailPool()
165 message = turbomail.Message(
166 "from@localhost",
167 "to@localhost",
168 "Subject"
169 )
170 message.plain = "Hello world!"
171 pool.enqueue(message)
172 # wait for message to send
173 pool.shutdown()
174
175 """
176
178 """Deliver up to I{jobs} messages per queue.
179
180 If there are no messages available in the queue, the worker
181 will wait up to I{timeout} seconds for data. If the timeout
182 expires, the thread will exit gracefully."""
183
184 count = 0
185 dispatch = Dispatch(**self._kw)
186
187 log.debug("Worker starting work.")
188
189 while True:
190 if not count < self._jobs:
191 log.debug("Worker death from old age - spawning child.")
192 self.spawn()
193 break
194
195 try:
196 unit = self._queue.get(True, self._timeout)
197 dispatch(unit)
198
199 except Empty:
200 log.debug("Worker death from starvation.")
201 break
202
203 count += 1
204