Package turbomail :: Module pool
[hide private]
[frames] | no frames]

Source Code for Module turbomail.pool

  1  # encoding: utf-8 
  2   
  3  """Generic Pool and MailPool class definitions.""" 
  4   
  5  import logging 
  6  log = logging.getLogger("turbomail.pool") 
  7   
  8  import math 
  9  from Queue import Queue, Empty 
 10  from threading import Event, Thread 
 11  from turbomail.dispatch import Dispatch 
 12   
 13   
 14  __all__ = ['Pool', 'MailPool'] 
 15   
 16   
17 -class Pool(Thread):
18 """A threadpool which checks regularily for new jobs and spawns processes 19 as needed. 20 21 Do not use this class directly. Always subclass and override the worker 22 method. 23 """ 24
25 - def __init__(self, interval=10, threads=4, jobs=10, timeout=60, polling=False, **kw):
26 """Initialize the threadpool. 27 28 @param interval: A delay, in seconds, between spawn runs. 29 @type interval: int 30 31 @param threads: The maximum number of concurrent threads. 32 @type threads: int 33 34 @param jobs: The maximum number of jobs a single thread is 35 allowed to handle before dying of old age. 36 @type jobs: int 37 38 @param timeout: The amount of time, in seconds, a thread is 39 allowed to sit idle before dying of starvation. 40 @type timeout: int 41 42 @param polling: Enable or disable the periodic polling 43 mechanism. Disabled, threads will be created, 44 as required, when work is enqueued. 45 @type polling: bool 46 """ 47 48 super(Pool, self).__init__() 49 50 self._pool = 0 51 self._queue = Queue() 52 self._finished = Event() 53 self._interval = interval 54 self._threads = threads 55 self._jobs = jobs 56 self._timeout = timeout 57 self._polling = False 58 self._kw = kw 59 60 log.debug("Thread pool created.")
61
62 - def enqueue(self, work, block=True, timeout=None):
63 """Enqueue a Message instance. 64 65 @param work: The unit of work can be any callable that returns a 66 three-item tuple containing the sender and 67 recipient addresses and a properly formatted MIME 68 message, in that order. The preferred type is an 69 instance of the Message class or subclass. 70 @type work: callable 71 72 @param block: Block code execution until there is a free slot in 73 the queue. If I{block} is True and I{timeout} is 74 None, block indefinately. 75 @type block: bool 76 77 @param timeout: How long to block execution (in seconds). If 78 the timeout expires (or block is false and the 79 queue is full) raise the Full exception. 80 @type timeout: int 81 """ 82 83 if callable(work): 84 self._queue.put(work(), block=block, timeout=timeout) 85 else: 86 self._queue.put(work, block=block, timeout=timeout) 87 88 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 89 90 if not self._polling and not self._queue.empty() and self._pool < optimum_threads: 91 log.debug("Work enqueued. Spawning %d threads." % (optimum_threads - self._pool)) 92 for i in range(int(optimum_threads - self._pool)): 93 self.spawn() 94 95 else: 96 log.debug("Work enqueued.")
97 98
99 - def shutdown(self):
100 """Quit the management thread and shutdown the queue.""" 101 102 log.debug("Shutdown requested.") 103 self._finished.set()
104
105 - def spawn(self):
106 thread = Thread(target=self.wrapper) 107 thread.start() 108 self._pool += 1
109
110 - def run(self):
111 """The management thread. 112 113 Do not call directly. Instead, use the I{start} method. 114 """ 115 116 log.debug("Beginning thread pool main loop.") 117 118 while True: 119 if self._finished.isSet(): 120 log.debug("Shutdown request acknowledged.") 121 break 122 123 if not self._queue.empty(): 124 log.debug("Estimate %d work units in the queue." % self._queue.qsize()) 125 126 optimum_threads = min(self._threads, math.ceil(self._queue.qsize() / float(self._jobs))) 127 128 if not self._queue.empty() and self._pool < optimum_threads: 129 log.debug("Creating %d threads." % (optimum_threads - self._pool)) 130 for i in range(int(optimum_threads - self._pool)): 131 self.spawn() 132 133 self._finished.wait(self._interval) 134 135 log.debug("Thread pool main loop has ended.")
136
137 - def wrapper(self):
138 """Thread wrapper to log and keep track of the active thread count.""" 139 140 log.debug("Thread pool worker starting up.") 141 142 self.worker() 143 144 self._pool -= 1 145 log.debug("Thread pool worker finished.")
146
147 - def worker(self):
148 """This method must be overridden in a subclass and is used to 149 perform the work of the threadpool. 150 151 Will raise a NotImplementedError exception if not subclassed.""" 152 153 raise NotImplementedError
154 155
156 -class MailPool(Pool):
157 """Mail delivery threadpool. 158 159 This class delivers messages from a queue using the Dispatch class. 160 161 Example usage:: 162 163 import turbomail 164 pool = turbomail.MailPool() 165 message = turbomail.Message( 166 "from@localhost", 167 "to@localhost", 168 "Subject" 169 ) 170 message.plain = "Hello world!" 171 pool.enqueue(message) 172 # wait for message to send 173 pool.shutdown() 174 175 """ 176
177 - def worker(self):
178 """Deliver up to I{jobs} messages per queue. 179 180 If there are no messages available in the queue, the worker 181 will wait up to I{timeout} seconds for data. If the timeout 182 expires, the thread will exit gracefully.""" 183 184 count = 0 185 dispatch = Dispatch(**self._kw) 186 187 log.debug("Worker starting work.") 188 189 while True: 190 if not count < self._jobs: 191 log.debug("Worker death from old age - spawning child.") 192 self.spawn() 193 break 194 195 try: 196 unit = self._queue.get(True, self._timeout) 197 dispatch(unit) 198 199 except Empty: 200 log.debug("Worker death from starvation.") 201 break 202 203 count += 1
204