Module pprocess
[hide private]
[frames] | no frames]

Source Code for Module pprocess

  1  """ 
  2  A simple parallel processing API for Python, inspired somewhat by the thread 
  3  module, slightly less by pypar, and slightly less still by pypvm. 
  4   
  5  Copyright (C) 2005, 2006, 2007 Paul Boddie <paul@boddie.org.uk> 
  6   
  7  This program is free software; you can redistribute it and/or modify it under 
  8  the terms of the GNU Lesser General Public License as published by the Free 
  9  Software Foundation; either version 3 of the License, or (at your option) any 
 10  later version. 
 11   
 12  This program is distributed in the hope that it will be useful, but WITHOUT 
 13  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 14  FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 15  details. 
 16   
 17  You should have received a copy of the GNU Lesser General Public License along 
 18  with this program.  If not, see <http://www.gnu.org/licenses/>. 
 19  """ 
 20   
 21  __version__ = "0.3.1" 
 22   
 23  import os 
 24  import sys 
 25  import select 
 26  import socket 
 27   
 28  try: 
 29      import cPickle as pickle 
 30  except ImportError: 
 31      import pickle 
 32   
 33  # Communications. 
 34   
35 -class AcknowledgementError(Exception):
36 pass
37
38 -class Channel:
39 40 "A communications channel." 41
42 - def __init__(self, pid, read_pipe, write_pipe):
43 44 """ 45 Initialise the channel with a process identifier 'pid', a 'read_pipe' 46 from which messages will be received, and a 'write_pipe' into which 47 messages will be sent. 48 """ 49 50 self.pid = pid 51 self.read_pipe = read_pipe 52 self.write_pipe = write_pipe 53 self.closed = 0
54
55 - def __del__(self):
56 57 # Since signals don't work well with I/O, we close pipes and wait for 58 # created processes upon finalisation. 59 60 self.close()
61
62 - def close(self):
63 64 "Explicitly close the channel." 65 66 if not self.closed: 67 self.closed = 1 68 self.read_pipe.close() 69 self.write_pipe.close()
70 #self.wait(os.WNOHANG) 71
72 - def wait(self, options=0):
73 74 "Wait for the created process, if any, to exit." 75 76 if self.pid != 0: 77 try: 78 os.waitpid(self.pid, options) 79 except OSError: 80 pass
81
82 - def _send(self, obj):
83 84 "Send the given object 'obj' through the channel." 85 86 pickle.dump(obj, self.write_pipe) 87 self.write_pipe.flush()
88
89 - def send(self, obj):
90 91 """ 92 Send the given object 'obj' through the channel. Then wait for an 93 acknowledgement. (The acknowledgement makes the caller wait, thus 94 preventing processes from exiting and disrupting the communications 95 channel and losing data.) 96 """ 97 98 self._send(obj) 99 if self._receive() != "OK": 100 raise AcknowledgementError, obj
101
102 - def _receive(self):
103 104 "Receive an object through the channel, returning the object." 105 106 obj = pickle.load(self.read_pipe) 107 if isinstance(obj, Exception): 108 raise obj 109 else: 110 return obj
111
112 - def receive(self):
113 114 """ 115 Receive an object through the channel, returning the object. Send an 116 acknowledgement of receipt. (The acknowledgement makes the sender wait, 117 thus preventing processes from exiting and disrupting the communications 118 channel and losing data.) 119 """ 120 121 try: 122 obj = self._receive() 123 return obj 124 finally: 125 self._send("OK")
126 127 # Management of processes and communications. 128
129 -class Exchange:
130 131 """ 132 A communications exchange that can be used to detect channels which are 133 ready to communicate. Subclasses of this class can define the 'store_data' 134 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 135 """ 136
137 - def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
138 139 """ 140 Initialise the exchange with an optional list of 'channels'. 141 142 If the optional 'limit' is specified, restrictions on the addition of 143 new channels can be enforced and observed through the 'add_wait', 'wait' 144 and 'finish' methods. To make use of these methods, create a subclass of 145 this class and define a working 'store_data' method. 146 147 If the optional 'reuse' parameter is set to a true value, channels and 148 processes will be reused for waiting computations. 149 150 If the optional 'autoclose' parameter is set to a false value, channels 151 will not be closed automatically when they are removed from the exchange 152 - by default they are closed when removed. 153 """ 154 155 self.limit = limit 156 self.reuse = reuse 157 self.autoclose = autoclose 158 self.waiting = [] 159 self.readables = {} 160 self.removed = [] 161 self.poller = select.poll() 162 for channel in channels or []: 163 self.add(channel)
164
165 - def add(self, channel):
166 167 "Add the given 'channel' to the exchange." 168 169 self.readables[channel.read_pipe.fileno()] = channel 170 self.poller.register(channel.read_pipe.fileno(), select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
171
172 - def active(self):
173 174 "Return a list of active channels." 175 176 return self.readables.values()
177
178 - def ready(self, timeout=None):
179 180 """ 181 Wait for a period of time specified by the optional 'timeout' (or until 182 communication is possible) and return a list of channels which are ready 183 to be read from. 184 """ 185 186 fds = self.poller.poll(timeout) 187 readables = [] 188 self.removed = [] 189 190 for fd, status in fds: 191 channel = self.readables[fd] 192 removed = 0 193 194 # Remove ended/error channels. 195 196 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 197 self.remove(channel) 198 self.removed.append(channel) 199 removed = 1 200 201 # Record readable channels. 202 203 if status & select.POLLIN: 204 if not (removed and self.autoclose): 205 readables.append(channel) 206 207 return readables
208
209 - def remove(self, channel):
210 211 """ 212 Remove the given 'channel' from the exchange. 213 """ 214 215 del self.readables[channel.read_pipe.fileno()] 216 self.poller.unregister(channel.read_pipe.fileno()) 217 if self.autoclose: 218 channel.close() 219 channel.wait()
220 221 # Enhanced exchange methods involving channel limits. 222
223 - def add_wait(self, channel):
224 225 """ 226 Add the given 'channel' to the exchange, waiting if the limit on active 227 channels would be exceeded by adding the channel. 228 """ 229 230 self.wait() 231 self.add(channel)
232
233 - def wait(self):
234 235 """ 236 Test for the limit on channels, blocking and reading incoming data until 237 the number of channels is below the limit. 238 """ 239 240 # If limited, block until channels have been closed. 241 242 while self.limit is not None and len(self.active()) >= self.limit: 243 self.store()
244
245 - def start_waiting(self, channel):
246 247 """ 248 Start a waiting process given the reception of data on the given 249 'channel'. 250 """ 251 252 if self.waiting: 253 callable, args, kw = self.waiting.pop() 254 255 # Try and reuse existing channels if possible. 256 257 if self.reuse: 258 259 # Re-add the channel - this may update information related to 260 # the channel in subclasses. 261 262 self.add(channel) 263 channel.send((args, kw)) 264 else: 265 self.add(start(callable, *args, **kw)) 266 267 # Where channels are being reused, but where no processes are waiting 268 # any more, send a special value to tell them to quit. 269 270 elif self.reuse: 271 channel.send(None)
272
273 - def finish(self):
274 275 """ 276 Finish the use of the exchange by waiting for all channels to complete. 277 """ 278 279 while self.active(): 280 self.store()
281
282 - def store(self):
283 284 "For each ready channel, process the incoming data." 285 286 for channel in self.ready(): 287 self.store_data(channel) 288 self.start_waiting(channel)
289
290 - def store_data(self, channel):
291 292 """ 293 Store incoming data from the specified 'channel'. In subclasses of this 294 class, such data could be stored using instance attributes. 295 """ 296 297 raise NotImplementedError, "store_data"
298 299 # Convenience methods. 300
301 - def start(self, callable, *args, **kw):
302 303 """ 304 Using pprocess.start, create a new process for the given 'callable' 305 using any additional arguments provided. Then, monitor the channel 306 created between this process and the created process. 307 """ 308 309 if self.limit is not None and len(self.active()) >= self.limit: 310 self.waiting.insert(0, (callable, args, kw)) 311 return 312 313 self.add_wait(start(callable, *args, **kw))
314
315 - def create(self):
316 317 """ 318 Using pprocess.create, create a new process and return the created 319 communications channel to the created process. In the creating process, 320 return None - the channel receiving data from the created process will 321 be automatically managed by this exchange. 322 """ 323 324 channel = create() 325 if channel.pid == 0: 326 return channel 327 else: 328 self.add_wait(channel) 329 return None
330
331 - def manage(self, callable):
332 333 """ 334 Wrap the given 'callable' in an object which can then be called in the 335 same way as 'callable', but with new processes and communications 336 managed automatically. 337 """ 338 339 return ManagedCallable(callable, self)
340
341 -class ManagedCallable:
342 343 "A callable managed by an exchange." 344
345 - def __init__(self, callable, exchange):
346 347 """ 348 Wrap the given 'callable', using the given 'exchange' to monitor the 349 channels created for communications between this and the created 350 processes. Note that the 'callable' must be parallel-aware (that is, 351 have a 'channel' parameter). Use the MakeParallel class to wrap other 352 kinds of callable objects. 353 """ 354 355 self.callable = callable 356 self.exchange = exchange
357
358 - def __call__(self, *args, **kw):
359 360 "Invoke the callable with the supplied arguments." 361 362 self.exchange.start(self.callable, *args, **kw)
363 364 # Abstractions and utilities. 365
366 -class Map(Exchange):
367 368 "An exchange which can be used like the built-in 'map' function." 369
370 - def __init__(self, *args, **kw):
371 Exchange.__init__(self, *args, **kw) 372 self.init()
373
374 - def init(self):
375 376 "Remember the channel addition order to order output." 377 378 self.channel_number = 0 379 self.channels = {} 380 self.results = []
381
382 - def add(self, channel):
383 384 "Add the given 'channel' to the exchange." 385 386 Exchange.add(self, channel) 387 self.channels[channel] = self.channel_number 388 self.channel_number += 1
389
390 - def start(self, callable, *args, **kw):
391 392 """ 393 Using pprocess.start, create a new process for the given 'callable' 394 using any additional arguments provided. Then, monitor the channel 395 created between this process and the created process. 396 """ 397 398 self.results.append(None) # placeholder 399 Exchange.start(self, callable, *args, **kw)
400
401 - def create(self):
402 403 """ 404 Using pprocess.create, create a new process and return the created 405 communications channel to the created process. In the creating process, 406 return None - the channel receiving data from the created process will 407 be automatically managed by this exchange. 408 """ 409 410 self.results.append(None) # placeholder 411 return Exchange.create(self)
412
413 - def __call__(self, callable, sequence):
414 415 "Wrap and invoke 'callable' for each element in the 'sequence'." 416 417 if not isinstance(callable, MakeParallel): 418 wrapped = MakeParallel(callable) 419 else: 420 wrapped = callable 421 422 self.init() 423 424 # Start processes for each element in the sequence. 425 426 for i in sequence: 427 self.start(wrapped, i) 428 429 # Access to the results occurs through this object. 430 431 return self
432
433 - def __getitem__(self, i):
434 self.finish() 435 return self.results[i]
436
437 - def __iter__(self):
438 self.finish() 439 return iter(self.results)
440
441 - def store_data(self, channel):
442 443 "Accumulate the incoming data, associating results with channels." 444 445 data = channel.receive() 446 self.results[self.channels[channel]] = data 447 del self.channels[channel]
448
449 -class Queue(Exchange):
450 451 """ 452 An exchange acting as a queue, making data from created processes available 453 in the order in which it is received. 454 """ 455
456 - def __init__(self, *args, **kw):
457 Exchange.__init__(self, *args, **kw) 458 self.queue = []
459
460 - def store_data(self, channel):
461 462 "Accumulate the incoming data, associating results with channels." 463 464 data = channel.receive() 465 self.queue.insert(0, data)
466
467 - def __iter__(self):
468 return self
469
470 - def next(self):
471 472 "Return the next element in the queue." 473 474 if self.queue: 475 return self.queue.pop() 476 while self.active(): 477 self.store() 478 if self.queue: 479 return self.queue.pop() 480 else: 481 raise StopIteration
482
483 -class MakeParallel:
484 485 "A wrapper around functions making them able to communicate results." 486
487 - def __init__(self, callable):
488 489 """ 490 Initialise the wrapper with the given 'callable'. This object will then 491 be able to accept a 'channel' parameter when invoked, and to forward the 492 result of the given 'callable' via the channel provided back to the 493 invoking process. 494 """ 495 496 self.callable = callable
497
498 - def __call__(self, channel, *args, **kw):
499 500 "Invoke the callable and return its result via the given 'channel'." 501 502 channel.send(self.callable(*args, **kw))
503
504 -class MakeReusable(MakeParallel):
505 506 """ 507 A wrapper around functions making them able to communicate results in a 508 reusable fashion. 509 """ 510
511 - def __call__(self, channel, *args, **kw):
512 513 "Invoke the callable and return its result via the given 'channel'." 514 515 channel.send(self.callable(*args, **kw)) 516 t = channel.receive() 517 while t is not None: 518 args, kw = t 519 channel.send(self.callable(*args, **kw)) 520 t = channel.receive()
521 522 # Utility functions. 523
524 -def create():
525 526 """ 527 Create a new process, returning a communications channel to both the 528 creating process and the created process. 529 """ 530 531 parent, child = socket.socketpair() 532 for s in [parent, child]: 533 s.setblocking(1) 534 535 pid = os.fork() 536 if pid == 0: 537 parent.close() 538 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 539 else: 540 child.close() 541 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
542
543 -def exit(channel):
544 545 """ 546 Terminate a created process, closing the given 'channel'. 547 """ 548 549 channel.close() 550 os._exit(0)
551
552 -def start(callable, *args, **kw):
553 554 """ 555 Create a new process which shall start running in the given 'callable'. 556 Additional arguments to the 'callable' can be given as additional arguments 557 to this function. 558 559 Return a communications channel to the creating process. For the created 560 process, supply a channel as the 'channel' parameter in the given 'callable' 561 so that it may send data back to the creating process. 562 """ 563 564 channel = create() 565 if channel.pid == 0: 566 try: 567 try: 568 callable(channel, *args, **kw) 569 except: 570 exc_type, exc_value, exc_traceback = sys.exc_info() 571 channel.send(exc_value) 572 finally: 573 exit(channel) 574 else: 575 return channel
576
577 -def waitall():
578 579 "Wait for all created processes to terminate." 580 581 try: 582 while 1: 583 os.wait() 584 except OSError: 585 pass
586
587 -def pmap(callable, sequence, limit=None):
588 589 """ 590 A parallel version of the built-in map function with an optional process 591 'limit'. The given 'callable' should not be parallel-aware (that is, have a 592 'channel' parameter) since it will be wrapped for parallel communications 593 before being invoked. 594 595 Return the processed 'sequence' where each element in the sequence is 596 processed by a different process. 597 """ 598 599 mymap = Map(limit=limit) 600 return mymap(callable, sequence)
601 602 # vim: tabstop=4 expandtab shiftwidth=4 603