Package turbomail :: Module pool
[hide private]
[frames] | no frames]

Source Code for Module turbomail.pool

  1  # encoding: utf-8 
  2   
  3  """Generic Pool and MailPool class definitions.""" 
  4   
  5  import logging 
  6  import smtplib 
  7  import socket 
  8  import time 
  9  log = logging.getLogger("turbomail.pool") 
 10   
 11  import math 
 12  from Queue import Queue, Empty 
 13  from threading import Event, Thread 
 14  from turbomail.dispatch import Dispatch 
 15   
 16   
 17  __all__ = ['Pool', 'MailPool'] 
 18   
 19   
20 -class Pool(Thread):
21 """A thread pool which checks regularly for new jobs and spawns processes 22 as needed. 23 24 Do not use this class directly. Always subclass and override the worker 25 method. 26 """ 27
28 - def __init__(self, interval=10, threads=4, jobs=10, timeout=60, polling=False, testmode=False, **kw):
29 """Initialize the thread pool. 30 31 @param interval: A delay, in seconds, between spawn runs. 32 @type interval: int 33 34 @param threads: The maximum number of concurrent threads. 35 @type threads: int 36 37 @param jobs: The maximum number of jobs a single thread is 38 allowed to handle before dying of old age. 39 @type jobs: int 40 41 @param timeout: The amount of time, in seconds, a thread is 42 allowed to sit idle before dying of starvation. 43 @type timeout: int 44 45 @param polling: Enable or disable the periodic polling 46 mechanism. Disabled, threads will be created, 47 as required, when work is enqueued. 48 @type polling: bool 49 50 @param testmode: Enable or disable the unit testing mode. 51 Disabled, no mail is actually sent but only 52 stored in the queue for later retrieval. 53 @type testmode: bool 54 """ 55 56 super(Pool, self).__init__() 57 58 self._pool = 0 59 self._queue = Queue() 60 self._finished = Event() 61 self._interval = interval 62 self._threads = threads 63 self._jobs = jobs 64 self._timeout = timeout 65 self._polling = polling 66 self._testmode = testmode 67 self._kw = kw 68 69 log.debug("Thread pool created.")
70
71 - def enqueue(self, work, block=True, timeout=None):
72 """Enqueue a Message instance. 73 74 @param work: The unit of work can be any callable that returns a 75 three-item tuple containing the sender and 76 recipient addresses and a properly formatted MIME 77 message, in that order. The preferred type is an 78 instance of the Message class or subclass. 79 @type work: callable 80 81 @param block: Block code execution until there is a free slot in 82 the queue. If I{block} is True and I{timeout} is 83 None, block indefinitely. 84 @type block: bool 85 86 @param timeout: How long to block execution (in seconds). If 87 the timeout expires (or block is false and the 88 queue is full) raise the Full exception. 89 @type timeout: int 90 """ 91 92 if callable(work): 93 self._queue.put(work(), block=block, timeout=timeout) 94 else: 95 self._queue.put(work, block=block, timeout=timeout) 96 97 if self._testmode: 98 log.debug("Testmode enabled, not sending mail.") 99 return 100 101 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 102 103 if not self._polling and not self._queue.empty() and self._pool < optimum_threads: 104 log.debug("Work enqueued. Spawning %d threads." % (optimum_threads - self._pool)) 105 for i in range(int(optimum_threads - self._pool)): 106 self.spawn() 107 108 else: 109 log.debug("Work enqueued.")
110 111
112 - def shutdown(self):
113 """Quit the management thread and shutdown the queue.""" 114 115 log.debug("Shutdown requested.") 116 self._finished.set()
117
118 - def spawn(self):
119 thread = Thread(target=self.wrapper) 120 thread.start() 121 self._pool += 1
122
123 - def run(self):
124 """The management thread. 125 126 Do not call directly. Instead, use the I{start} method. 127 """ 128 129 log.debug("Beginning thread pool main loop.") 130 131 while True: 132 if self._finished.isSet(): 133 log.debug("Shutdown request acknowledged.") 134 break 135 136 if not self._queue.empty(): 137 log.debug("Estimate %d work units in the queue." % self._queue.qsize()) 138 139 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 140 141 if not self._queue.empty() and self._pool < optimum_threads: 142 log.debug("Creating %d threads." % (optimum_threads - self._pool)) 143 for i in range(int(optimum_threads - self._pool)): 144 self.spawn() 145 146 self._finished.wait(self._interval) 147 148 log.debug("Thread pool main loop has ended.")
149
150 - def wrapper(self):
151 """Thread wrapper to log and keep track of the active thread count.""" 152 153 log.debug("Thread pool worker starting up.") 154 155 self.worker() 156 157 self._pool -= 1 158 log.debug("Thread pool worker finished.")
159
160 - def worker(self):
161 """This method must be overridden in a subclass and is used to 162 perform the work of the thread pool. 163 164 Will raise a NotImplementedError exception if not subclassed.""" 165 166 raise NotImplementedError
167 168
169 -class MailPool(Pool):
170 """Mail delivery thread pool. 171 172 This class delivers messages from a queue using the Dispatch class. 173 174 Example usage:: 175 176 import turbomail 177 pool = turbomail.MailPool() 178 message = turbomail.Message( 179 "from@localhost", 180 "to@localhost", 181 "Subject" 182 ) 183 message.plain = "Hello world!" 184 pool.enqueue(message) 185 # wait for message to send 186 pool.shutdown() 187 188 """ 189
190 - def worker(self):
191 """Deliver up to I{jobs} messages per queue. 192 193 If there are no messages available in the queue, the worker 194 will wait up to I{timeout} seconds for data. If the timeout 195 expires, the thread will exit gracefully.""" 196 197 count = 0 198 dispatch = Dispatch(**self._kw) 199 200 log.debug("Worker starting work.") 201 202 while True: 203 if not count < self._jobs: 204 log.debug("Worker death from old age - spawning child.") 205 self.spawn() 206 break 207 208 unit = None 209 try: 210 unit = self._queue.get(True, self._timeout) 211 dispatch(unit) 212 213 except Empty: 214 log.debug("Worker death from starvation.") 215 break 216 217 except smtplib.SMTPRecipientsRefused, exception: 218 log.exception(exception) 219 log.error("SMTP server refused delivery. Not retrying.") 220 221 except (socket.error, smtplib.SMTPException), exception: 222 log.exception(exception) 223 log.error("Exception occured when sending mail. Retrying...") 224 time.sleep(1) 225 self._queue.put(unit, block=False) 226 log.debug("Worker death by exception.") 227 break 228 229 count += 1
230