1
2
3 """Generic Pool and MailPool class definitions."""
4
5 import logging
6 import smtplib
7 import socket
8 import time
9 log = logging.getLogger("turbomail.pool")
10
11 import math
12 from Queue import Queue, Empty
13 from threading import Event, Thread
14 from turbomail.dispatch import Dispatch
15
16
17 __all__ = ['Pool', 'MailPool']
18
19
21 """A thread pool which checks regularly for new jobs and spawns processes
22 as needed.
23
24 Do not use this class directly. Always subclass and override the worker
25 method.
26 """
27
28 - def __init__(self, interval=10, threads=4, jobs=10, timeout=60, polling=False, testmode=False, **kw):
29 """Initialize the thread pool.
30
31 @param interval: A delay, in seconds, between spawn runs.
32 @type interval: int
33
34 @param threads: The maximum number of concurrent threads.
35 @type threads: int
36
37 @param jobs: The maximum number of jobs a single thread is
38 allowed to handle before dying of old age.
39 @type jobs: int
40
41 @param timeout: The amount of time, in seconds, a thread is
42 allowed to sit idle before dying of starvation.
43 @type timeout: int
44
45 @param polling: Enable or disable the periodic polling
46 mechanism. Disabled, threads will be created,
47 as required, when work is enqueued.
48 @type polling: bool
49
50 @param testmode: Enable or disable the unit testing mode.
51 Disabled, no mail is actually sent but only
52 stored in the queue for later retrieval.
53 @type testmode: bool
54 """
55
56 super(Pool, self).__init__()
57
58 self._pool = 0
59 self._queue = Queue()
60 self._finished = Event()
61 self._interval = interval
62 self._threads = threads
63 self._jobs = jobs
64 self._timeout = timeout
65 self._polling = polling
66 self._testmode = testmode
67 self._kw = kw
68
69 log.debug("Thread pool created.")
70
71 - def enqueue(self, work, block=True, timeout=None):
72 """Enqueue a Message instance.
73
74 @param work: The unit of work can be any callable that returns a
75 three-item tuple containing the sender and
76 recipient addresses and a properly formatted MIME
77 message, in that order. The preferred type is an
78 instance of the Message class or subclass.
79 @type work: callable
80
81 @param block: Block code execution until there is a free slot in
82 the queue. If I{block} is True and I{timeout} is
83 None, block indefinitely.
84 @type block: bool
85
86 @param timeout: How long to block execution (in seconds). If
87 the timeout expires (or block is false and the
88 queue is full) raise the Full exception.
89 @type timeout: int
90 """
91
92 if callable(work):
93 self._queue.put(work(), block=block, timeout=timeout)
94 else:
95 self._queue.put(work, block=block, timeout=timeout)
96
97 if self._testmode:
98 log.debug("Testmode enabled, not sending mail.")
99 return
100
101 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs)))
102
103 if not self._polling and not self._queue.empty() and self._pool < optimum_threads:
104 log.debug("Work enqueued. Spawning %d threads." % (optimum_threads - self._pool))
105 for i in range(int(optimum_threads - self._pool)):
106 self.spawn()
107
108 else:
109 log.debug("Work enqueued.")
110
111
113 """Quit the management thread and shutdown the queue."""
114
115 log.debug("Shutdown requested.")
116 self._finished.set()
117
119 thread = Thread(target=self.wrapper)
120 thread.start()
121 self._pool += 1
122
124 """The management thread.
125
126 Do not call directly. Instead, use the I{start} method.
127 """
128
129 log.debug("Beginning thread pool main loop.")
130
131 while True:
132 if self._finished.isSet():
133 log.debug("Shutdown request acknowledged.")
134 break
135
136 if not self._queue.empty():
137 log.debug("Estimate %d work units in the queue." % self._queue.qsize())
138
139 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs)))
140
141 if not self._queue.empty() and self._pool < optimum_threads:
142 log.debug("Creating %d threads." % (optimum_threads - self._pool))
143 for i in range(int(optimum_threads - self._pool)):
144 self.spawn()
145
146 self._finished.wait(self._interval)
147
148 log.debug("Thread pool main loop has ended.")
149
151 """Thread wrapper to log and keep track of the active thread count."""
152
153 log.debug("Thread pool worker starting up.")
154
155 self.worker()
156
157 self._pool -= 1
158 log.debug("Thread pool worker finished.")
159
161 """This method must be overridden in a subclass and is used to
162 perform the work of the thread pool.
163
164 Will raise a NotImplementedError exception if not subclassed."""
165
166 raise NotImplementedError
167
168
170 """Mail delivery thread pool.
171
172 This class delivers messages from a queue using the Dispatch class.
173
174 Example usage::
175
176 import turbomail
177 pool = turbomail.MailPool()
178 message = turbomail.Message(
179 "from@localhost",
180 "to@localhost",
181 "Subject"
182 )
183 message.plain = "Hello world!"
184 pool.enqueue(message)
185 # wait for message to send
186 pool.shutdown()
187
188 """
189
191 """Deliver up to I{jobs} messages per queue.
192
193 If there are no messages available in the queue, the worker
194 will wait up to I{timeout} seconds for data. If the timeout
195 expires, the thread will exit gracefully."""
196
197 count = 0
198 dispatch = Dispatch(**self._kw)
199
200 log.debug("Worker starting work.")
201
202 while True:
203 if not count < self._jobs:
204 log.debug("Worker death from old age - spawning child.")
205 self.spawn()
206 break
207
208 unit = None
209 try:
210 unit = self._queue.get(True, self._timeout)
211 dispatch(unit)
212
213 except Empty:
214 log.debug("Worker death from starvation.")
215 break
216
217 except smtplib.SMTPRecipientsRefused, exception:
218 log.exception(exception)
219 log.error("SMTP server refused delivery. Not retrying.")
220
221 except (socket.error, smtplib.SMTPException), exception:
222 log.exception(exception)
223 log.error("Exception occured when sending mail. Retrying...")
224 time.sleep(1)
225 self._queue.put(unit, block=False)
226 log.debug("Worker death by exception.")
227 break
228
229 count += 1
230