1 """
2 A simple parallel processing API for Python, inspired somewhat by the thread
3 module, slightly less by pypar, and slightly less still by pypvm.
4
5 Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk>
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by the Free
9 Software Foundation; either version 3 of the License, or (at your option) any
10 later version.
11
12 This program is distributed in the hope that it will be useful, but WITHOUT
13 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 details.
16
17 You should have received a copy of the GNU Lesser General Public License along
18 with this program. If not, see <http://www.gnu.org/licenses/>.
19 """
20
21 __version__ = "0.3.1"
22
23 import os
24 import sys
25 import select
26 import socket
27
28 try:
29 import cPickle as pickle
30 except ImportError:
31 import pickle
32
33
34
37
39
40 "A communications channel."
41
42 - def __init__(self, pid, read_pipe, write_pipe):
43
44 """
45 Initialise the channel with a process identifier 'pid', a 'read_pipe'
46 from which messages will be received, and a 'write_pipe' into which
47 messages will be sent.
48 """
49
50 self.pid = pid
51 self.read_pipe = read_pipe
52 self.write_pipe = write_pipe
53 self.closed = 0
54
56
57
58
59
60 self.close()
61
63
64 "Explicitly close the channel."
65
66 if not self.closed:
67 self.closed = 1
68 self.read_pipe.close()
69 self.write_pipe.close()
70
71
72 - def wait(self, options=0):
73
74 "Wait for the created process, if any, to exit."
75
76 if self.pid != 0:
77 try:
78 os.waitpid(self.pid, options)
79 except OSError:
80 pass
81
83
84 "Send the given object 'obj' through the channel."
85
86 pickle.dump(obj, self.write_pipe)
87 self.write_pipe.flush()
88
89 - def send(self, obj):
90
91 """
92 Send the given object 'obj' through the channel. Then wait for an
93 acknowledgement. (The acknowledgement makes the caller wait, thus
94 preventing processes from exiting and disrupting the communications
95 channel and losing data.)
96 """
97
98 self._send(obj)
99 if self._receive() != "OK":
100 raise AcknowledgementError, obj
101
103
104 "Receive an object through the channel, returning the object."
105
106 obj = pickle.load(self.read_pipe)
107 if isinstance(obj, Exception):
108 raise obj
109 else:
110 return obj
111
113
114 """
115 Receive an object through the channel, returning the object. Send an
116 acknowledgement of receipt. (The acknowledgement makes the sender wait,
117 thus preventing processes from exiting and disrupting the communications
118 channel and losing data.)
119 """
120
121 try:
122 obj = self._receive()
123 return obj
124 finally:
125 self._send("OK")
126
127
128
130
131 """
132 A communications exchange that can be used to detect channels which are
133 ready to communicate. Subclasses of this class can define the 'store_data'
134 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
135 """
136
137 - def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
138
139 """
140 Initialise the exchange with an optional list of 'channels'.
141
142 If the optional 'limit' is specified, restrictions on the addition of
143 new channels can be enforced and observed through the 'add_wait', 'wait'
144 and 'finish' methods. To make use of these methods, create a subclass of
145 this class and define a working 'store_data' method.
146
147 If the optional 'reuse' parameter is set to a true value, channels and
148 processes will be reused for waiting computations.
149
150 If the optional 'autoclose' parameter is set to a false value, channels
151 will not be closed automatically when they are removed from the exchange
152 - by default they are closed when removed.
153 """
154
155 self.limit = limit
156 self.reuse = reuse
157 self.autoclose = autoclose
158 self.waiting = []
159 self.readables = {}
160 self.removed = []
161 self.poller = select.poll()
162 for channel in channels or []:
163 self.add(channel)
164
165 - def add(self, channel):
166
167 "Add the given 'channel' to the exchange."
168
169 self.readables[channel.read_pipe.fileno()] = channel
170 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
171
173
174 "Return a list of active channels."
175
176 return self.readables.values()
177
178 - def ready(self, timeout=None):
179
180 """
181 Wait for a period of time specified by the optional 'timeout' (or until
182 communication is possible) and return a list of channels which are ready
183 to be read from.
184 """
185
186 fds = self.poller.poll(timeout)
187 readables = []
188 self.removed = []
189
190 for fd, status in fds:
191 channel = self.readables[fd]
192 removed = 0
193
194
195
196 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
197 self.remove(channel)
198 self.removed.append(channel)
199 removed = 1
200
201
202
203 if status & select.POLLIN:
204 if not (removed and self.autoclose):
205 readables.append(channel)
206
207 return readables
208
210
211 """
212 Remove the given 'channel' from the exchange.
213 """
214
215 del self.readables[channel.read_pipe.fileno()]
216 self.poller.unregister(channel.read_pipe.fileno())
217 if self.autoclose:
218 channel.close()
219 channel.wait()
220
221
222
224
225 """
226 Add the given 'channel' to the exchange, waiting if the limit on active
227 channels would be exceeded by adding the channel.
228 """
229
230 self.wait()
231 self.add(channel)
232
234
235 """
236 Test for the limit on channels, blocking and reading incoming data until
237 the number of channels is below the limit.
238 """
239
240
241
242 while self.limit is not None and len(self.active()) >= self.limit:
243 self.store()
244
246
247 """
248 Start a waiting process given the reception of data on the given
249 'channel'.
250 """
251
252 if self.waiting:
253 callable, args, kw = self.waiting.pop()
254
255
256
257 if self.reuse:
258
259
260
261
262 self.add(channel)
263 channel.send((args, kw))
264 else:
265 self.add(start(callable, *args, **kw))
266
267
268
269
270 elif self.reuse:
271 channel.send(None)
272
274
275 """
276 Finish the use of the exchange by waiting for all channels to complete.
277 """
278
279 while self.active():
280 self.store()
281
283
284 "For each ready channel, process the incoming data."
285
286 for channel in self.ready():
287 self.store_data(channel)
288 self.start_waiting(channel)
289
291
292 """
293 Store incoming data from the specified 'channel'. In subclasses of this
294 class, such data could be stored using instance attributes.
295 """
296
297 raise NotImplementedError, "store_data"
298
299
300
301 - def start(self, callable, *args, **kw):
302
303 """
304 Using pprocess.start, create a new process for the given 'callable'
305 using any additional arguments provided. Then, monitor the channel
306 created between this process and the created process.
307 """
308
309 if self.limit is not None and len(self.active()) >= self.limit:
310 self.waiting.insert(0, (callable, args, kw))
311 return
312
313 self.add_wait(start(callable, *args, **kw))
314
316
317 """
318 Using pprocess.create, create a new process and return the created
319 communications channel to the created process. In the creating process,
320 return None - the channel receiving data from the created process will
321 be automatically managed by this exchange.
322 """
323
324 channel = create()
325 if channel.pid == 0:
326 return channel
327 else:
328 self.add_wait(channel)
329 return None
330
332
333 """
334 Wrap the given 'callable' in an object which can then be called in the
335 same way as 'callable', but with new processes and communications
336 managed automatically.
337 """
338
339 return ManagedCallable(callable, self)
340
342
343 "A callable managed by an exchange."
344
345 - def __init__(self, callable, exchange):
346
347 """
348 Wrap the given 'callable', using the given 'exchange' to monitor the
349 channels created for communications between this and the created
350 processes. Note that the 'callable' must be parallel-aware (that is,
351 have a 'channel' parameter). Use the MakeParallel class to wrap other
352 kinds of callable objects.
353 """
354
355 self.callable = callable
356 self.exchange = exchange
357
359
360 "Invoke the callable with the supplied arguments."
361
362 self.exchange.start(self.callable, *args, **kw)
363
364
365
366 -class Map(Exchange):
367
368 "An exchange which can be used like the built-in 'map' function."
369
373
375
376 "Remember the channel addition order to order output."
377
378 self.channel_number = 0
379 self.channels = {}
380 self.results = []
381
382 - def add(self, channel):
383
384 "Add the given 'channel' to the exchange."
385
386 Exchange.add(self, channel)
387 self.channels[channel] = self.channel_number
388 self.channel_number += 1
389
390 - def start(self, callable, *args, **kw):
391
392 """
393 Using pprocess.start, create a new process for the given 'callable'
394 using any additional arguments provided. Then, monitor the channel
395 created between this process and the created process.
396 """
397
398 self.results.append(None)
399 Exchange.start(self, callable, *args, **kw)
400
402
403 """
404 Using pprocess.create, create a new process and return the created
405 communications channel to the created process. In the creating process,
406 return None - the channel receiving data from the created process will
407 be automatically managed by this exchange.
408 """
409
410 self.results.append(None)
411 return Exchange.create(self)
412
413 - def __call__(self, callable, sequence):
414
415 "Wrap and invoke 'callable' for each element in the 'sequence'."
416
417 if not isinstance(callable, MakeParallel):
418 wrapped = MakeParallel(callable)
419 else:
420 wrapped = callable
421
422 self.init()
423
424
425
426 for i in sequence:
427 self.start(wrapped, i)
428
429
430
431 return self
432
434 self.finish()
435 return self.results[i]
436
438 self.finish()
439 return iter(self.results)
440
442
443 "Accumulate the incoming data, associating results with channels."
444
445 data = channel.receive()
446 self.results[self.channels[channel]] = data
447 del self.channels[channel]
448
450
451 """
452 An exchange acting as a queue, making data from created processes available
453 in the order in which it is received.
454 """
455
459
461
462 "Accumulate the incoming data, associating results with channels."
463
464 data = channel.receive()
465 self.queue.insert(0, data)
466
469
471
472 "Return the next element in the queue."
473
474 if self.queue:
475 return self.queue.pop()
476 while self.active():
477 self.store()
478 if self.queue:
479 return self.queue.pop()
480 else:
481 raise StopIteration
482
484
485 "A wrapper around functions making them able to communicate results."
486
488
489 """
490 Initialise the wrapper with the given 'callable'. This object will then
491 be able to accept a 'channel' parameter when invoked, and to forward the
492 result of the given 'callable' via the channel provided back to the
493 invoking process.
494 """
495
496 self.callable = callable
497
498 - def __call__(self, channel, *args, **kw):
499
500 "Invoke the callable and return its result via the given 'channel'."
501
502 channel.send(self.callable(*args, **kw))
503
505
506 """
507 A wrapper around functions making them able to communicate results in a
508 reusable fashion.
509 """
510
511 - def __call__(self, channel, *args, **kw):
512
513 "Invoke the callable and return its result via the given 'channel'."
514
515 channel.send(self.callable(*args, **kw))
516 t = channel.receive()
517 while t is not None:
518 args, kw = t
519 channel.send(self.callable(*args, **kw))
520 t = channel.receive()
521
522
523
525
526 """
527 Create a new process, returning a communications channel to both the
528 creating process and the created process.
529 """
530
531 parent, child = socket.socketpair()
532 for s in [parent, child]:
533 s.setblocking(1)
534
535 pid = os.fork()
536 if pid == 0:
537 parent.close()
538 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
539 else:
540 child.close()
541 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
542
544
545 """
546 Terminate a created process, closing the given 'channel'.
547 """
548
549 channel.close()
550 os._exit(0)
551
552 -def start(callable, *args, **kw):
553
554 """
555 Create a new process which shall start running in the given 'callable'.
556 Additional arguments to the 'callable' can be given as additional arguments
557 to this function.
558
559 Return a communications channel to the creating process. For the created
560 process, supply a channel as the 'channel' parameter in the given 'callable'
561 so that it may send data back to the creating process.
562 """
563
564 channel = create()
565 if channel.pid == 0:
566 try:
567 try:
568 callable(channel, *args, **kw)
569 except:
570 exc_type, exc_value, exc_traceback = sys.exc_info()
571 channel.send(exc_value)
572 finally:
573 exit(channel)
574 else:
575 return channel
576
578
579 "Wait for all created processes to terminate."
580
581 try:
582 while 1:
583 os.wait()
584 except OSError:
585 pass
586
587 -def pmap(callable, sequence, limit=None):
588
589 """
590 A parallel version of the built-in map function with an optional process
591 'limit'. The given 'callable' should not be parallel-aware (that is, have a
592 'channel' parameter) since it will be wrapped for parallel communications
593 before being invoked.
594
595 Return the processed 'sequence' where each element in the sequence is
596 processed by a different process.
597 """
598
599 mymap = Map(limit=limit)
600 return mymap(callable, sequence)
601
602
603