Module pprocess
[hide private]
[frames] | no frames]

Source Code for Module pprocess

   1  """ 
   2  A simple parallel processing API for Python, inspired somewhat by the thread 
   3  module, slightly less by pypar, and slightly less still by pypvm. 
   4   
   5  Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk> 
   6   
   7  This program is free software; you can redistribute it and/or modify it under 
   8  the terms of the GNU Lesser General Public License as published by the Free 
   9  Software Foundation; either version 3 of the License, or (at your option) any 
  10  later version. 
  11   
  12  This program is distributed in the hope that it will be useful, but WITHOUT 
  13  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  14  FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  15  details. 
  16   
  17  You should have received a copy of the GNU Lesser General Public License along 
  18  with this program.  If not, see <http://www.gnu.org/licenses/>. 
  19  """ 
  20   
  21  __version__ = "0.4" 
  22   
  23  import os 
  24  import sys 
  25  import select 
  26  import socket 
  27  import platform 
  28   
  29  try: 
  30      import cPickle as pickle 
  31  except ImportError: 
  32      import pickle 
  33   
  34  try: 
  35      set 
  36  except NameError: 
  37      from sets import Set as set 
  38   
  39  # Communications. 
  40   
41 -class AcknowledgementError(Exception):
42 pass
43
44 -class Channel:
45 46 "A communications channel." 47
48 - def __init__(self, pid, read_pipe, write_pipe):
49 50 """ 51 Initialise the channel with a process identifier 'pid', a 'read_pipe' 52 from which messages will be received, and a 'write_pipe' into which 53 messages will be sent. 54 """ 55 56 self.pid = pid 57 self.read_pipe = read_pipe 58 self.write_pipe = write_pipe
59
60 - def __del__(self):
61 62 # Since signals don't work well with I/O, we close pipes and wait for 63 # created processes upon finalisation. 64 65 self.close()
66
67 - def close(self):
68 69 "Explicitly close the channel." 70 71 if self.read_pipe is not None: 72 self.read_pipe.close() 73 self.read_pipe = None 74 if self.write_pipe is not None: 75 self.write_pipe.close() 76 self.write_pipe = None
77 #self.wait(os.WNOHANG) 78
79 - def wait(self, options=0):
80 81 "Wait for the created process, if any, to exit." 82 83 if self.pid != 0: 84 try: 85 os.waitpid(self.pid, options) 86 except OSError: 87 pass
88
89 - def _send(self, obj):
90 91 "Send the given object 'obj' through the channel." 92 93 pickle.dump(obj, self.write_pipe) 94 self.write_pipe.flush()
95
96 - def send(self, obj):
97 98 """ 99 Send the given object 'obj' through the channel. Then wait for an 100 acknowledgement. (The acknowledgement makes the caller wait, thus 101 preventing processes from exiting and disrupting the communications 102 channel and losing data.) 103 """ 104 105 self._send(obj) 106 if self._receive() != "OK": 107 raise AcknowledgementError, obj
108
109 - def _receive(self):
110 111 "Receive an object through the channel, returning the object." 112 113 obj = pickle.load(self.read_pipe) 114 if isinstance(obj, Exception): 115 raise obj 116 else: 117 return obj
118
119 - def receive(self):
120 121 """ 122 Receive an object through the channel, returning the object. Send an 123 acknowledgement of receipt. (The acknowledgement makes the sender wait, 124 thus preventing processes from exiting and disrupting the communications 125 channel and losing data.) 126 """ 127 128 try: 129 obj = self._receive() 130 return obj 131 finally: 132 self._send("OK")
133
134 -class PersistentChannel(Channel):
135 136 """ 137 A persistent communications channel which can handle peer disconnection, 138 acting as a server, meaning that this channel is associated with a specific 139 address which can be contacted by other processes. 140 """ 141
142 - def __init__(self, pid, endpoint, address):
143 Channel.__init__(self, pid, None, None) 144 self.endpoint = endpoint 145 self.address = address 146 self.poller = select.poll() 147 148 # Listen for connections before this process is interested in 149 # communicating. It is not desirable to wait for connections at this 150 # point because this will block the process. 151 152 self.endpoint.listen(1)
153
154 - def close(self):
155 156 "Close the persistent channel and remove the socket file." 157 158 Channel.close(self) 159 try: 160 os.unlink(self.address) 161 except OSError: 162 pass
163
164 - def _ensure_pipes(self):
165 166 "Ensure that the channel is capable of communicating." 167 168 if self.read_pipe is None or self.write_pipe is None: 169 170 # Accept any incoming connections. 171 172 endpoint, address = self.endpoint.accept() 173 self.read_pipe = endpoint.makefile("r", 0) 174 self.write_pipe = endpoint.makefile("w", 0) 175 176 # Monitor the write pipe for error conditions. 177 178 fileno = self.write_pipe.fileno() 179 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
180
181 - def _reset_pipes(self):
182 183 "Discard the existing connection." 184 185 fileno = self.write_pipe.fileno() 186 self.poller.unregister(fileno) 187 self.read_pipe = None 188 self.write_pipe = None 189 self.endpoint.listen(1)
190
191 - def _ensure_communication(self, timeout=None):
192 193 "Ensure that sending and receiving are possible." 194 195 while 1: 196 self._ensure_pipes() 197 fileno = self.write_pipe.fileno() 198 fds = self.poller.poll(timeout) 199 for fd, status in fds: 200 if fd != fileno: 201 continue 202 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 203 204 # Broken connection: discard it and start all over again. 205 206 self._reset_pipes() 207 break 208 else: 209 return
210
211 - def _send(self, obj):
212 213 "Send the given object 'obj' through the channel." 214 215 self._ensure_communication() 216 Channel._send(self, obj)
217
218 - def _receive(self):
219 220 "Receive an object through the channel, returning the object." 221 222 self._ensure_communication() 223 return Channel._receive(self)
224 225 # Management of processes and communications. 226
227 -class Exchange:
228 229 """ 230 A communications exchange that can be used to detect channels which are 231 ready to communicate. Subclasses of this class can define the 'store_data' 232 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 233 """ 234
235 - def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
236 237 """ 238 Initialise the exchange with an optional list of 'channels'. 239 240 If the optional 'limit' is specified, restrictions on the addition of 241 new channels can be enforced and observed through the 'add_wait', 'wait' 242 and 'finish' methods. To make use of these methods, create a subclass of 243 this class and define a working 'store_data' method. 244 245 If the optional 'reuse' parameter is set to a true value, channels and 246 processes will be reused for waiting computations. 247 248 If the optional 'autoclose' parameter is set to a false value, channels 249 will not be closed automatically when they are removed from the exchange 250 - by default they are closed when removed. 251 """ 252 253 self.limit = limit 254 self.reuse = reuse 255 self.autoclose = autoclose 256 self.waiting = [] 257 self.readables = {} 258 self.removed = [] 259 self.poller = select.poll() 260 for channel in channels or []: 261 self.add(channel)
262
263 - def add(self, channel):
264 265 "Add the given 'channel' to the exchange." 266 267 fileno = channel.read_pipe.fileno() 268 self.readables[fileno] = channel 269 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
270
271 - def active(self):
272 273 "Return a list of active channels." 274 275 return self.readables.values()
276
277 - def ready(self, timeout=None):
278 279 """ 280 Wait for a period of time specified by the optional 'timeout' (or until 281 communication is possible) and return a list of channels which are ready 282 to be read from. 283 """ 284 285 fds = self.poller.poll(timeout) 286 readables = [] 287 self.removed = [] 288 289 for fd, status in fds: 290 channel = self.readables[fd] 291 removed = 0 292 293 # Remove ended/error channels. 294 295 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 296 self.remove(channel) 297 self.removed.append(channel) 298 removed = 1 299 300 # Record readable channels. 301 302 if status & select.POLLIN: 303 if not (removed and self.autoclose): 304 readables.append(channel) 305 306 return readables
307
308 - def remove(self, channel):
309 310 """ 311 Remove the given 'channel' from the exchange. 312 """ 313 314 fileno = channel.read_pipe.fileno() 315 del self.readables[fileno] 316 self.poller.unregister(fileno) 317 if self.autoclose: 318 channel.close() 319 channel.wait()
320 321 # Enhanced exchange methods involving channel limits. 322
323 - def add_wait(self, channel):
324 325 """ 326 Add the given 'channel' to the exchange, waiting if the limit on active 327 channels would be exceeded by adding the channel. 328 """ 329 330 self.wait() 331 self.add(channel)
332
333 - def wait(self):
334 335 """ 336 Test for the limit on channels, blocking and reading incoming data until 337 the number of channels is below the limit. 338 """ 339 340 # If limited, block until channels have been closed. 341 342 while self.limit is not None and len(self.active()) >= self.limit: 343 self.store()
344
345 - def _get_waiting(self, channel):
346 347 """ 348 Get waiting callable and argument information for new processes, given 349 the reception of data on the given 'channel'. 350 """ 351 352 if self.waiting: 353 callable, args, kw = self.waiting.pop() 354 355 # Try and reuse existing channels if possible. 356 357 if self.reuse: 358 359 # Re-add the channel - this may update information related to 360 # the channel in subclasses. 361 362 self.add(channel) 363 channel.send((args, kw)) 364 else: 365 return callable, args, kw 366 367 # Where channels are being reused, but where no processes are waiting 368 # any more, send a special value to tell them to quit. 369 370 elif self.reuse: 371 channel.send(None) 372 373 return None
374
375 - def finish(self):
376 377 """ 378 Finish the use of the exchange by waiting for all channels to complete. 379 """ 380 381 while self.active(): 382 self.store()
383
384 - def store(self):
385 386 "For each ready channel, process the incoming data." 387 388 for channel in self.ready(): 389 self.store_data(channel) 390 self.start_waiting(channel)
391
392 - def store_data(self, channel):
393 394 """ 395 Store incoming data from the specified 'channel'. In subclasses of this 396 class, such data could be stored using instance attributes. 397 """ 398 399 raise NotImplementedError, "store_data"
400 401 # Support for the convenience methods. 402
403 - def _set_waiting(self, callable, args, kw):
404 405 """ 406 Support process creation by returning whether the given 'callable' has 407 been queued for later invocation. 408 """ 409 410 if self.limit is not None and len(self.active()) >= self.limit: 411 self.waiting.insert(0, (callable, args, kw)) 412 return 1 413 else: 414 return 0
415
416 - def _get_channel_for_process(self, channel):
417 418 """ 419 Support process creation by returning the given 'channel' to the 420 creating process, and None to the created process. 421 """ 422 423 if channel.pid == 0: 424 return channel 425 else: 426 self.add_wait(channel) 427 return None
428 429 # Methods for overriding, related to the convenience methods. 430
431 - def start_waiting(self, channel):
432 433 """ 434 Start a waiting process given the reception of data on the given 435 'channel'. 436 """ 437 438 details = self._get_waiting(channel) 439 if details is not None: 440 callable, args, kw = details 441 self.add(start(callable, *args, **kw))
442 443 # Convenience methods. 444
445 - def start(self, callable, *args, **kw):
446 447 """ 448 Create a new process for the given 'callable' using any additional 449 arguments provided. Then, monitor the channel created between this 450 process and the created process. 451 """ 452 453 if self._set_waiting(callable, args, kw): 454 return 455 456 self.add_wait(start(callable, *args, **kw))
457
458 - def create(self):
459 460 """ 461 Create a new process and return the created communications channel to 462 the created process. In the creating process, return None - the channel 463 receiving data from the created process will be automatically managed by 464 this exchange. 465 """ 466 467 channel = create() 468 return self._get_channel_for_process(channel)
469
470 - def manage(self, callable):
471 472 """ 473 Wrap the given 'callable' in an object which can then be called in the 474 same way as 'callable', but with new processes and communications 475 managed automatically. 476 """ 477 478 return ManagedCallable(callable, self)
479
480 -class Persistent:
481 482 """ 483 A mix-in class providing methods to exchanges for the management of 484 persistent communications. 485 """ 486
487 - def start_waiting(self, channel):
488 489 """ 490 Start a waiting process given the reception of data on the given 491 'channel'. 492 """ 493 494 details = self._get_waiting(channel) 495 if details is not None: 496 callable, args, kw = details 497 self.add(start_persistent(channel.address, callable, *args, **kw))
498
499 - def start(self, address, callable, *args, **kw):
500 501 """ 502 Create a new process, located at the given 'address', for the given 503 'callable' using any additional arguments provided. Then, monitor the 504 channel created between this process and the created process. 505 """ 506 507 if self._set_waiting(callable, args, kw): 508 return 509 510 start_persistent(address, callable, *args, **kw)
511
512 - def create(self, address):
513 514 """ 515 Create a new process, located at the given 'address', and return the 516 created communications channel to the created process. In the creating 517 process, return None - the channel receiving data from the created 518 process will be automatically managed by this exchange. 519 """ 520 521 channel = create_persistent(address) 522 return self._get_channel_for_process(channel)
523
524 - def manage(self, address, callable):
525 526 """ 527 Using the given 'address', publish the given 'callable' in an object 528 which can then be called in the same way as 'callable', but with new 529 processes and communications managed automatically. 530 """ 531 532 return PersistentCallable(address, callable, self)
533
534 - def connect(self, address):
535 536 "Connect to a process which is contactable via the given 'address'." 537 538 channel = connect_persistent(address) 539 self.add_wait(channel)
540
541 -class ManagedCallable:
542 543 "A callable managed by an exchange." 544
545 - def __init__(self, callable, exchange):
546 547 """ 548 Wrap the given 'callable', using the given 'exchange' to monitor the 549 channels created for communications between this and the created 550 processes. Note that the 'callable' must be parallel-aware (that is, 551 have a 'channel' parameter). Use the MakeParallel class to wrap other 552 kinds of callable objects. 553 """ 554 555 self.callable = callable 556 self.exchange = exchange
557
558 - def __call__(self, *args, **kw):
559 560 "Invoke the callable with the supplied arguments." 561 562 self.exchange.start(self.callable, *args, **kw)
563
564 -class PersistentCallable:
565 566 "A callable which sets up a persistent communications channel." 567
568 - def __init__(self, address, callable, exchange):
569 570 """ 571 Using the given 'address', wrap the given 'callable', using the given 572 'exchange' to monitor the channels created for communications between 573 this and the created processes, so that when it is called, a background 574 process is started within which the 'callable' will run. Note that the 575 'callable' must be parallel-aware (that is, have a 'channel' parameter). 576 Use the MakeParallel class to wrap other kinds of callable objects. 577 """ 578 579 self.callable = callable 580 self.exchange = exchange 581 self.address = address
582
583 - def __call__(self, *args, **kw):
584 585 "Invoke the callable with the supplied arguments." 586 587 self.exchange.start(self.address, self.callable, *args, **kw)
588
589 -class BackgroundCallable:
590 591 """ 592 A callable which sets up a persistent communications channel, but is 593 unmanaged by an exchange. 594 """ 595
596 - def __init__(self, address, callable):
597 598 """ 599 Using the given 'address', wrap the given 'callable'. This object can 600 then be invoked, but the wrapped callable will be run in a background 601 process. Note that the 'callable' must be parallel-aware (that is, have 602 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 603 of callable objects. 604 """ 605 606 self.callable = callable 607 self.address = address
608
609 - def __call__(self, *args, **kw):
610 611 "Invoke the callable with the supplied arguments." 612 613 start_persistent(self.address, self.callable, *args, **kw)
614 615 # Abstractions and utilities. 616
617 -class Map(Exchange):
618 619 "An exchange which can be used like the built-in 'map' function." 620
621 - def __init__(self, *args, **kw):
622 Exchange.__init__(self, *args, **kw) 623 self.init()
624
625 - def init(self):
626 627 "Remember the channel addition order to order output." 628 629 self.channel_number = 0 630 self.channels = {} 631 self.results = []
632
633 - def add(self, channel):
634 635 "Add the given 'channel' to the exchange." 636 637 Exchange.add(self, channel) 638 self.channels[channel] = self.channel_number 639 self.channel_number += 1
640
641 - def start(self, callable, *args, **kw):
642 643 """ 644 Create a new process for the given 'callable' using any additional 645 arguments provided. Then, monitor the channel created between this 646 process and the created process. 647 """ 648 649 self.results.append(None) # placeholder 650 Exchange.start(self, callable, *args, **kw)
651
652 - def create(self):
653 654 """ 655 Create a new process and return the created communications channel to 656 the created process. In the creating process, return None - the channel 657 receiving data from the created process will be automatically managed by 658 this exchange. 659 """ 660 661 self.results.append(None) # placeholder 662 return Exchange.create(self)
663
664 - def __call__(self, callable, sequence):
665 666 "Wrap and invoke 'callable' for each element in the 'sequence'." 667 668 if not isinstance(callable, MakeParallel): 669 wrapped = MakeParallel(callable) 670 else: 671 wrapped = callable 672 673 self.init() 674 675 # Start processes for each element in the sequence. 676 677 for i in sequence: 678 self.start(wrapped, i) 679 680 # Access to the results occurs through this object. 681 682 return self
683
684 - def __getitem__(self, i):
685 self.finish() 686 return self.results[i]
687
688 - def __iter__(self):
689 self.finish() 690 return iter(self.results)
691
692 - def store_data(self, channel):
693 694 "Accumulate the incoming data, associating results with channels." 695 696 data = channel.receive() 697 self.results[self.channels[channel]] = data 698 del self.channels[channel]
699
700 -class Queue(Exchange):
701 702 """ 703 An exchange acting as a queue, making data from created processes available 704 in the order in which it is received. 705 """ 706
707 - def __init__(self, *args, **kw):
708 Exchange.__init__(self, *args, **kw) 709 self.queue = []
710
711 - def store_data(self, channel):
712 713 "Accumulate the incoming data, associating results with channels." 714 715 data = channel.receive() 716 self.queue.insert(0, data)
717
718 - def __iter__(self):
719 return self
720
721 - def next(self):
722 723 "Return the next element in the queue." 724 725 if self.queue: 726 return self.queue.pop() 727 while self.active(): 728 self.store() 729 if self.queue: 730 return self.queue.pop() 731 else: 732 raise StopIteration
733
734 -class MakeParallel:
735 736 "A wrapper around functions making them able to communicate results." 737
738 - def __init__(self, callable):
739 740 """ 741 Initialise the wrapper with the given 'callable'. This object will then 742 be able to accept a 'channel' parameter when invoked, and to forward the 743 result of the given 'callable' via the channel provided back to the 744 invoking process. 745 """ 746 747 self.callable = callable
748
749 - def __call__(self, channel, *args, **kw):
750 751 "Invoke the callable and return its result via the given 'channel'." 752 753 channel.send(self.callable(*args, **kw))
754
755 -class MakeReusable(MakeParallel):
756 757 """ 758 A wrapper around functions making them able to communicate results in a 759 reusable fashion. 760 """ 761
762 - def __call__(self, channel, *args, **kw):
763 764 "Invoke the callable and return its result via the given 'channel'." 765 766 channel.send(self.callable(*args, **kw)) 767 t = channel.receive() 768 while t is not None: 769 args, kw = t 770 channel.send(self.callable(*args, **kw)) 771 t = channel.receive()
772 773 # Persistent variants. 774
775 -class PersistentExchange(Persistent, Exchange):
776 777 "An exchange which manages persistent communications." 778 779 pass
780
781 -class PersistentQueue(Persistent, Queue):
782 783 "A queue which manages persistent communications." 784 785 pass
786 787 # Convenience functions. 788
789 -def BackgroundQueue(address):
790 791 """ 792 Connect to a process reachable via the given 'address', making the results 793 of which accessible via a queue. 794 """ 795 796 queue = PersistentQueue(limit=1) 797 queue.connect(address) 798 return queue
799
800 -def pmap(callable, sequence, limit=None):
801 802 """ 803 A parallel version of the built-in map function with an optional process 804 'limit'. The given 'callable' should not be parallel-aware (that is, have a 805 'channel' parameter) since it will be wrapped for parallel communications 806 before being invoked. 807 808 Return the processed 'sequence' where each element in the sequence is 809 processed by a different process. 810 """ 811 812 mymap = Map(limit=limit) 813 return mymap(callable, sequence)
814 815 # Utility functions. 816 817 _cpuinfo_fields = "physical id", "core id" 818
819 -def _get_number_of_cores():
820 821 """ 822 Return the number of distinct, genuine processor cores. If the platform is 823 not supported by this function, None is returned. 824 """ 825 826 try: 827 f = open("/proc/cpuinfo") 828 try: 829 processors = set() 830 processor = [None, None] 831 832 for line in f.xreadlines(): 833 for i, field in enumerate(_cpuinfo_fields): 834 if line.startswith(field): 835 t = line.split(":") 836 processor[i] = int(t[1].strip()) 837 break 838 else: 839 if line.startswith("processor") and processor[0] is not None: 840 processors.add(tuple(processor)) 841 processor = [None, None] 842 843 if processor[0] is not None: 844 processors.add(tuple(processor)) 845 846 return len(processors) 847 848 finally: 849 f.close() 850 851 except OSError: 852 return None
853
854 -def _get_number_of_cores_solaris():
855 856 """ 857 Return the number of cores for OpenSolaris 2008.05 and possibly other 858 editions of Solaris. 859 """ 860 861 f = os.popen("psrinfo -p") 862 try: 863 return int(f.read().strip()) 864 finally: 865 f.close()
866 867 # Low-level functions. 868
869 -def create_socketpair():
870 871 """ 872 Create a new process, returning a communications channel to both the 873 creating process and the created process. 874 """ 875 876 parent, child = socket.socketpair() 877 for s in [parent, child]: 878 s.setblocking(1) 879 880 pid = os.fork() 881 if pid == 0: 882 parent.close() 883 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 884 else: 885 child.close() 886 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
887
888 -def create_pipes():
889 890 """ 891 Create a new process, returning a communications channel to both the 892 creating process and the created process. 893 894 This function uses pipes instead of a socket pair, since some platforms 895 seem to have problems with poll and such socket pairs. 896 """ 897 898 pr, cw = os.pipe() 899 cr, pw = os.pipe() 900 901 pid = os.fork() 902 if pid == 0: 903 os.close(pr) 904 os.close(pw) 905 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 906 else: 907 os.close(cr) 908 os.close(cw) 909 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
910 911 if platform.system() == "SunOS": 912 create = create_pipes 913 get_number_of_cores = _get_number_of_cores_solaris 914 else: 915 create = create_socketpair 916 get_number_of_cores = _get_number_of_cores 917
918 -def create_persistent(address):
919 920 """ 921 Create a new process, returning a persistent communications channel between 922 the creating process and the created process. This channel can be 923 disconnected from the creating process and connected to another process, and 924 thus can be used to collect results from daemon processes. 925 926 In order to be able to reconnect to created processes, the 'address' of the 927 communications endpoint for the created process needs to be provided. This 928 should be a filename. 929 """ 930 931 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 932 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 933 child.bind(address) 934 935 for s in [parent, child]: 936 s.setblocking(1) 937 938 pid = os.fork() 939 if pid == 0: 940 parent.close() 941 return PersistentChannel(pid, child, address) 942 else: 943 child.close() 944 #parent.connect(address) 945 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
946
947 -def connect_persistent(address):
948 949 """ 950 Connect via a persistent channel to an existing created process, reachable 951 at the given 'address'. 952 """ 953 954 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 955 parent.setblocking(1) 956 parent.connect(address) 957 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
958
959 -def exit(channel):
960 961 """ 962 Terminate a created process, closing the given 'channel'. 963 """ 964 965 channel.close() 966 os._exit(0)
967
968 -def start(callable, *args, **kw):
969 970 """ 971 Create a new process which shall start running in the given 'callable'. 972 Additional arguments to the 'callable' can be given as additional arguments 973 to this function. 974 975 Return a communications channel to the creating process. For the created 976 process, supply a channel as the 'channel' parameter in the given 'callable' 977 so that it may send data back to the creating process. 978 """ 979 980 channel = create() 981 if channel.pid == 0: 982 try: 983 try: 984 callable(channel, *args, **kw) 985 except: 986 exc_type, exc_value, exc_traceback = sys.exc_info() 987 channel.send(exc_value) 988 finally: 989 exit(channel) 990 else: 991 return channel
992
993 -def start_persistent(address, callable, *args, **kw):
994 995 """ 996 Create a new process which shall be reachable using the given 'address' and 997 which will start running in the given 'callable'. Additional arguments to 998 the 'callable' can be given as additional arguments to this function. 999 1000 Return a communications channel to the creating process. For the created 1001 process, supply a channel as the 'channel' parameter in the given 'callable' 1002 so that it may send data back to the creating process. 1003 1004 Note that the created process employs a channel which is persistent: it can 1005 withstand disconnection from the creating process and subsequent connections 1006 from other processes. 1007 """ 1008 1009 channel = create_persistent(address) 1010 if channel.pid == 0: 1011 close_streams() 1012 try: 1013 try: 1014 callable(channel, *args, **kw) 1015 except: 1016 exc_type, exc_value, exc_traceback = sys.exc_info() 1017 channel.send(exc_value) 1018 finally: 1019 exit(channel) 1020 else: 1021 return channel
1022
1023 -def close_streams():
1024 1025 """ 1026 Close streams which keep the current process attached to any creating 1027 processes. 1028 """ 1029 1030 os.close(sys.stdin.fileno()) 1031 os.close(sys.stdout.fileno()) 1032 os.close(sys.stderr.fileno())
1033
1034 -def waitall():
1035 1036 "Wait for all created processes to terminate." 1037 1038 try: 1039 while 1: 1040 os.wait() 1041 except OSError: 1042 pass
1043 1044 # vim: tabstop=4 expandtab shiftwidth=4 1045