sources for pool.py [rev. unknown]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import Queue 
import threading
import time 
import sys
ERRORMARKER = object() 
class Reply(object): 
    """ reply instances provide access to the result
        of a function execution that got dispatched
        through WorkerPool.dispatch() 
    """
    _excinfo = None 
    def __init__(self, task): 
        self.task = task 
        self._queue = Queue.Queue() 
    def _set(self, result): 
        self._queue.put(result) 
    def _setexcinfo(self, excinfo): 
        self._excinfo = excinfo 
        self._queue.put(ERRORMARKER) 
    def _get_with_timeout(self, timeout): 
        # taken from python2.3's Queue.get() 
        # we want to run on python2.2 here
        delay = 0.0005 # 500 us -> initial delay of 1 ms
        endtime = time.time() + timeout
        while 1:
            try: 
                return self._queue.get_nowait() 
            except Queue.Empty: 
                remaining = endtime - time.time() 
                if remaining <= 0:  #time is over and no element arrived
                    raise IOError("timeout waiting for task %r" %(self.task,))
                delay = min(delay * 2, remaining, .05)
                time.sleep(delay)       #reduce CPU usage by using a sleep
    def get(self, timeout=None): 
        """ get the result object from an asynchronous function execution.
            if the function execution raised an exception, 
            then calling get() will reraise that exception
            including its traceback. 
        """
        if self._queue is None: 
            raise EOFError("reply has already been delivered")
        if timeout is not None: 
            result = self._get_with_timeout(timeout) 
        else: 
            result = self._queue.get() 
        if result is ERRORMARKER: 
            self._queue = None
            excinfo = self._excinfo 
            raise excinfo[0], excinfo[1], excinfo[2]
        return result 
class WorkerThread(threading.Thread): 
    def __init__(self, pool): 
        threading.Thread.__init__(self) 
        self._queue = Queue.Queue() 
        self._pool = pool 
        self.setDaemon(1) 
    def _run_once(self):
        reply = self._queue.get()
        if reply is SystemExit:
            return False
        assert self not in self._pool._ready 
        task = reply.task 
        try: 
            func, args, kwargs = task 
            result = func(*args, **kwargs) 
        except (SystemExit, KeyboardInterrupt): 
            return False
        except: 
            reply._setexcinfo(sys.exc_info()) 
        else: 
            reply._set(result) 
        # at this point, reply, task and all other local variables go away
        return True
    def run(self): 
        try: 
            while self._run_once():
                self._pool._ready[self] = True 
        finally: 
            del self._pool._alive[self]
            try: 
                del self._pool._ready[self]
            except KeyError: 
                pass
    def send(self, task): 
        reply = Reply(task) 
        self._queue.put(reply) 
        return reply 
    def stop(self): 
        self._queue.put(SystemExit)   
        
class WorkerPool(object): 
    """ A WorkerPool allows to dispatch function executions
        to threads.  Each Worker Thread is reused for multiple
        function executions. The dispatching operation 
        takes care to create and dispatch to existing 
        threads. 
        You need to call shutdown() to signal 
        the WorkerThreads to terminate and join() 
        in order to wait until all worker threads
        have terminated. 
    """
    _shuttingdown = False 
    def __init__(self, maxthreads=None): 
        """ init WorkerPool instance which may
            create up to `maxthreads` worker threads. 
        """
        self.maxthreads = maxthreads
        self._ready = {}
        self._alive = {}
    def dispatch(self, func, *args, **kwargs): 
        """ return Reply object for the asynchronous dispatch 
            of the given func(*args, **kwargs) in a 
            separate worker thread. 
        """
        if self._shuttingdown: 
            raise IOError("WorkerPool is already shutting down") 
        try: 
            thread, _ = self._ready.popitem() 
        except KeyError: # pop from empty list
            if self.maxthreads and len(self._alive) >= self.maxthreads: 
                raise IOError("can't create more than %d threads." %
                              (self.maxthreads,))
            thread = self._newthread() 
        return thread.send((func, args, kwargs))
    def _newthread(self): 
        thread = WorkerThread(self) 
        self._alive[thread] = True 
        thread.start() 
        return thread 
    def shutdown(self): 
        """ signal all worker threads to terminate. 
            call join() to wait until all threads termination. 
        """
        if not self._shuttingdown: 
            self._shuttingdown = True 
            for t in self._alive.keys(): 
                t.stop() 
    def join(self, timeout=None): 
        """ wait until all worker threads have terminated. """
        current = threading.currentThread()
        deadline = delta = None 
        if timeout is not None: 
            deadline = time.time() + timeout 
        for thread in self._alive.keys(): 
            if deadline: 
                delta = deadline - time.time() 
                if delta <= 0: 
                    raise IOError("timeout while joining threads") 
            thread.join(timeout=delta) 
            if thread.isAlive(): 
                raise IOError("timeout while joining threads") 
class NamedThreadPool: 
    def __init__(self, **kw): 
        self._namedthreads = {}
        for name, value in kw.items(): 
            self.start(name, value) 
    def __repr__(self): 
        return "<NamedThreadPool %r>" %(self._namedthreads) 
    def get(self, name=None): 
        if name is None: 
            l = []
            for x in self._namedthreads.values(): 
                l.extend(x) 
            return l 
        else: 
            return self._namedthreads.get(name, [])
    def getstarted(self, name=None): 
        return [t for t in self.get(name) if t.isAlive()]
    def prunestopped(self, name=None): 
        if name is None: 
            for name in self.names(): 
                self.prunestopped(name) 
        else: 
            self._namedthreads[name] = self.getstarted(name) 
    def names(self): 
        return self._namedthreads.keys() 
    def start(self, name, func): 
        l = self._namedthreads.setdefault(name, []) 
        thread = threading.Thread(name="%s%d" % (name, len(l)), 
                                  target=func) 
        thread.start() 
        l.append(thread)