Module pprocess
[hide private]
[frames] | no frames]

Source Code for Module pprocess

   1  """ 
   2  A simple parallel processing API for Python, inspired somewhat by the thread 
   3  module, slightly less by pypar, and slightly less still by pypvm. 
   4   
   5  Copyright (C) 2005, 2006, 2007, 2008, 2009 Paul Boddie <paul@boddie.org.uk> 
   6   
   7  This program is free software; you can redistribute it and/or modify it under 
   8  the terms of the GNU Lesser General Public License as published by the Free 
   9  Software Foundation; either version 3 of the License, or (at your option) any 
  10  later version. 
  11   
  12  This program is distributed in the hope that it will be useful, but WITHOUT 
  13  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
  14  FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  15  details. 
  16   
  17  You should have received a copy of the GNU Lesser General Public License along 
  18  with this program.  If not, see <http://www.gnu.org/licenses/>. 
  19  """ 
  20   
  21  __version__ = "0.5" 
  22   
  23  import os 
  24  import sys 
  25  import select 
  26  import socket 
  27  import platform 
  28   
  29  try: 
  30      import cPickle as pickle 
  31  except ImportError: 
  32      import pickle 
  33   
  34  try: 
  35      set 
  36  except NameError: 
  37      from sets import Set as set 
  38   
  39  # Special values. 
  40   
41 -class Undefined: pass
42 43 # Communications. 44
45 -class AcknowledgementError(Exception):
46 pass
47
48 -class Channel:
49 50 "A communications channel." 51
52 - def __init__(self, pid, read_pipe, write_pipe):
53 54 """ 55 Initialise the channel with a process identifier 'pid', a 'read_pipe' 56 from which messages will be received, and a 'write_pipe' into which 57 messages will be sent. 58 """ 59 60 self.pid = pid 61 self.read_pipe = read_pipe 62 self.write_pipe = write_pipe
63
64 - def __del__(self):
65 66 # Since signals don't work well with I/O, we close pipes and wait for 67 # created processes upon finalisation. 68 69 self.close()
70
71 - def close(self):
72 73 "Explicitly close the channel." 74 75 if self.read_pipe is not None: 76 self.read_pipe.close() 77 self.read_pipe = None 78 if self.write_pipe is not None: 79 self.write_pipe.close() 80 self.write_pipe = None
81 #self.wait(os.WNOHANG) 82
83 - def wait(self, options=0):
84 85 "Wait for the created process, if any, to exit." 86 87 if self.pid != 0: 88 try: 89 os.waitpid(self.pid, options) 90 except OSError: 91 pass
92
93 - def _send(self, obj):
94 95 "Send the given object 'obj' through the channel." 96 97 pickle.dump(obj, self.write_pipe) 98 self.write_pipe.flush()
99
100 - def send(self, obj):
101 102 """ 103 Send the given object 'obj' through the channel. Then wait for an 104 acknowledgement. (The acknowledgement makes the caller wait, thus 105 preventing processes from exiting and disrupting the communications 106 channel and losing data.) 107 """ 108 109 self._send(obj) 110 if self._receive() != "OK": 111 raise AcknowledgementError, obj
112
113 - def _receive(self):
114 115 "Receive an object through the channel, returning the object." 116 117 obj = pickle.load(self.read_pipe) 118 if isinstance(obj, Exception): 119 raise obj 120 else: 121 return obj
122
123 - def receive(self):
124 125 """ 126 Receive an object through the channel, returning the object. Send an 127 acknowledgement of receipt. (The acknowledgement makes the sender wait, 128 thus preventing processes from exiting and disrupting the communications 129 channel and losing data.) 130 """ 131 132 try: 133 obj = self._receive() 134 return obj 135 finally: 136 self._send("OK")
137
138 -class PersistentChannel(Channel):
139 140 """ 141 A persistent communications channel which can handle peer disconnection, 142 acting as a server, meaning that this channel is associated with a specific 143 address which can be contacted by other processes. 144 """ 145
146 - def __init__(self, pid, endpoint, address):
147 Channel.__init__(self, pid, None, None) 148 self.endpoint = endpoint 149 self.address = address 150 self.poller = select.poll() 151 152 # Listen for connections before this process is interested in 153 # communicating. It is not desirable to wait for connections at this 154 # point because this will block the process. 155 156 self.endpoint.listen(1)
157
158 - def close(self):
159 160 "Close the persistent channel and remove the socket file." 161 162 Channel.close(self) 163 try: 164 os.unlink(self.address) 165 except OSError: 166 pass
167
168 - def _ensure_pipes(self):
169 170 "Ensure that the channel is capable of communicating." 171 172 if self.read_pipe is None or self.write_pipe is None: 173 174 # Accept any incoming connections. 175 176 endpoint, address = self.endpoint.accept() 177 self.read_pipe = endpoint.makefile("r", 0) 178 self.write_pipe = endpoint.makefile("w", 0) 179 180 # Monitor the write pipe for error conditions. 181 182 fileno = self.write_pipe.fileno() 183 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
184
185 - def _reset_pipes(self):
186 187 "Discard the existing connection." 188 189 fileno = self.write_pipe.fileno() 190 self.poller.unregister(fileno) 191 self.read_pipe = None 192 self.write_pipe = None 193 self.endpoint.listen(1)
194
195 - def _ensure_communication(self, timeout=None):
196 197 "Ensure that sending and receiving are possible." 198 199 while 1: 200 self._ensure_pipes() 201 fileno = self.write_pipe.fileno() 202 fds = self.poller.poll(timeout) 203 for fd, status in fds: 204 if fd != fileno: 205 continue 206 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 207 208 # Broken connection: discard it and start all over again. 209 210 self._reset_pipes() 211 break 212 else: 213 return
214
215 - def _send(self, obj):
216 217 "Send the given object 'obj' through the channel." 218 219 self._ensure_communication() 220 Channel._send(self, obj)
221
222 - def _receive(self):
223 224 "Receive an object through the channel, returning the object." 225 226 self._ensure_communication() 227 return Channel._receive(self)
228 229 # Management of processes and communications. 230
231 -class Exchange:
232 233 """ 234 A communications exchange that can be used to detect channels which are 235 ready to communicate. Subclasses of this class can define the 'store_data' 236 method in order to enable the 'add_wait', 'wait' and 'finish' methods. 237 238 Once exchanges are populated with active channels, use of the principal 239 methods of the exchange typically cause the 'store' method to be invoked, 240 resulting in the processing of any incoming data. 241 """ 242
243 - def __init__(self, channels=None, limit=None, reuse=0, continuous=0, autoclose=1):
244 245 """ 246 Initialise the exchange with an optional list of 'channels'. 247 248 If the optional 'limit' is specified, restrictions on the addition of 249 new channels can be enforced and observed through the 'add_wait', 'wait' 250 and 'finish' methods. To make use of these methods, create a subclass of 251 this class and define a working 'store_data' method. 252 253 If the optional 'reuse' parameter is set to a true value, channels and 254 processes will be reused for waiting computations, but the callable will 255 be invoked for each computation. 256 257 If the optional 'continuous' parameter is set to a true value, channels 258 and processes will be retained after receiving data sent from such 259 processes, since it will be assumed that they will communicate more 260 data. 261 262 If the optional 'autoclose' parameter is set to a false value, channels 263 will not be closed automatically when they are removed from the exchange 264 - by default they are closed when removed. 265 """ 266 267 self.limit = limit 268 self.reuse = reuse 269 self.autoclose = autoclose 270 self.continuous = continuous 271 272 self.waiting = [] 273 self.readables = {} 274 self.removed = [] 275 self.poller = select.poll() 276 277 for channel in channels or []: 278 self.add(channel)
279 280 # Core methods, registering and reporting on channels. 281
282 - def add(self, channel):
283 284 "Add the given 'channel' to the exchange." 285 286 fileno = channel.read_pipe.fileno() 287 self.readables[fileno] = channel 288 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
289
290 - def active(self):
291 292 "Return a list of active channels." 293 294 return self.readables.values()
295
296 - def ready(self, timeout=None):
297 298 """ 299 Wait for a period of time specified by the optional 'timeout' in 300 milliseconds (or until communication is possible) and return a list of 301 channels which are ready to be read from. 302 """ 303 304 fds = self.poller.poll(timeout) 305 readables = [] 306 self.removed = [] 307 308 for fd, status in fds: 309 channel = self.readables[fd] 310 removed = 0 311 312 # Remove ended/error channels. 313 314 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR): 315 self.remove(channel) 316 self.removed.append(channel) 317 removed = 1 318 319 # Record readable channels. 320 321 if status & select.POLLIN: 322 if not (removed and self.autoclose): 323 readables.append(channel) 324 325 return readables
326
327 - def remove(self, channel):
328 329 """ 330 Remove the given 'channel' from the exchange. 331 """ 332 333 fileno = channel.read_pipe.fileno() 334 del self.readables[fileno] 335 self.poller.unregister(fileno) 336 if self.autoclose: 337 channel.close() 338 channel.wait()
339 340 # Enhanced exchange methods involving channel limits. 341
342 - def unfinished(self):
343 344 "Return whether the exchange still has work scheduled or in progress." 345 346 return self.active() or self.waiting
347
348 - def busy(self):
349 350 "Return whether the exchange uses as many channels as it is allowed to." 351 352 return self.limit is not None and len(self.active()) >= self.limit
353
354 - def add_wait(self, channel):
355 356 """ 357 Add the given 'channel' to the exchange, waiting if the limit on active 358 channels would be exceeded by adding the channel. 359 """ 360 361 self.wait() 362 self.add(channel)
363
364 - def wait(self):
365 366 """ 367 Test for the limit on channels, blocking and reading incoming data until 368 the number of channels is below the limit. 369 """ 370 371 # If limited, block until channels have been closed. 372 373 while self.busy(): 374 self.store()
375
376 - def finish(self):
377 378 """ 379 Finish the use of the exchange by waiting for all channels to complete. 380 """ 381 382 while self.unfinished(): 383 self.store()
384
385 - def store(self, timeout=None):
386 387 """ 388 For each ready channel, process the incoming data. If the optional 389 'timeout' parameter (a duration in milliseconds) is specified, wait only 390 for the specified duration if no channels are ready to provide data. 391 """ 392 393 # Either process input from active channels. 394 395 if self.active(): 396 for channel in self.ready(timeout): 397 self.store_data(channel) 398 self.start_waiting(channel) 399 400 # Or schedule new processes and channels. 401 402 else: 403 while self.waiting and not self.busy(): 404 callable, args, kw = self.waiting.pop() 405 self.start(callable, *args, **kw)
406
407 - def store_data(self, channel):
408 409 """ 410 Store incoming data from the specified 'channel'. In subclasses of this 411 class, such data could be stored using instance attributes. 412 """ 413 414 raise NotImplementedError, "store_data"
415 416 # Support for the convenience methods. 417
418 - def _get_waiting(self, channel):
419 420 """ 421 Get waiting callable and argument information for new processes, given 422 the reception of data on the given 'channel'. 423 """ 424 425 # For continuous channels, no scheduling is requested. 426 427 if self.waiting and not self.continuous: 428 429 # Schedule this callable and arguments. 430 431 callable, args, kw = self.waiting.pop() 432 433 # Try and reuse existing channels if possible. 434 435 if self.reuse: 436 437 # Re-add the channel - this may update information related to 438 # the channel in subclasses. 439 440 self.add(channel) 441 channel.send((args, kw)) 442 443 else: 444 return callable, args, kw 445 446 # Where channels are being reused, but where no processes are waiting 447 # any more, send a special value to tell them to quit. 448 449 elif self.reuse: 450 channel.send(None) 451 452 return None
453
454 - def _set_waiting(self, callable, args, kw):
455 456 """ 457 Support process creation by returning whether the given 'callable' has 458 been queued for later invocation. 459 """ 460 461 if self.busy(): 462 self.waiting.insert(0, (callable, args, kw)) 463 return 1 464 else: 465 return 0
466
467 - def _get_channel_for_process(self, channel):
468 469 """ 470 Support process creation by returning the given 'channel' to the 471 creating process, and None to the created process. 472 """ 473 474 if channel.pid == 0: 475 return channel 476 else: 477 self.add_wait(channel) 478 return None
479 480 # Methods for overriding, related to the convenience methods. 481
482 - def start_waiting(self, channel):
483 484 """ 485 Start a waiting process given the reception of data on the given 486 'channel'. 487 """ 488 489 details = self._get_waiting(channel) 490 if details is not None: 491 callable, args, kw = details 492 self.add(start(callable, *args, **kw))
493 494 # Convenience methods. 495
496 - def start(self, callable, *args, **kw):
497 498 """ 499 Create a new process for the given 'callable' using any additional 500 arguments provided. Then, monitor the channel created between this 501 process and the created process. 502 """ 503 504 if self._set_waiting(callable, args, kw): 505 return 506 507 self.add_wait(start(callable, *args, **kw))
508
509 - def create(self):
510 511 """ 512 Create a new process and return the created communications channel to 513 the created process. In the creating process, return None - the channel 514 receiving data from the created process will be automatically managed by 515 this exchange. 516 """ 517 518 channel = create() 519 return self._get_channel_for_process(channel)
520
521 - def manage(self, callable):
522 523 """ 524 Wrap the given 'callable' in an object which can then be called in the 525 same way as 'callable', but with new processes and communications 526 managed automatically. 527 """ 528 529 return ManagedCallable(callable, self)
530
531 -class Persistent:
532 533 """ 534 A mix-in class providing methods to exchanges for the management of 535 persistent communications. 536 """ 537
538 - def start_waiting(self, channel):
539 540 """ 541 Start a waiting process given the reception of data on the given 542 'channel'. 543 """ 544 545 details = self._get_waiting(channel) 546 if details is not None: 547 callable, args, kw = details 548 self.add(start_persistent(channel.address, callable, *args, **kw))
549
550 - def start(self, address, callable, *args, **kw):
551 552 """ 553 Create a new process, located at the given 'address', for the given 554 'callable' using any additional arguments provided. Then, monitor the 555 channel created between this process and the created process. 556 """ 557 558 if self._set_waiting(callable, args, kw): 559 return 560 561 start_persistent(address, callable, *args, **kw)
562
563 - def create(self, address):
564 565 """ 566 Create a new process, located at the given 'address', and return the 567 created communications channel to the created process. In the creating 568 process, return None - the channel receiving data from the created 569 process will be automatically managed by this exchange. 570 """ 571 572 channel = create_persistent(address) 573 return self._get_channel_for_process(channel)
574
575 - def manage(self, address, callable):
576 577 """ 578 Using the given 'address', publish the given 'callable' in an object 579 which can then be called in the same way as 'callable', but with new 580 processes and communications managed automatically. 581 """ 582 583 return PersistentCallable(address, callable, self)
584
585 - def connect(self, address):
586 587 "Connect to a process which is contactable via the given 'address'." 588 589 channel = connect_persistent(address) 590 self.add_wait(channel)
591
592 -class ManagedCallable:
593 594 "A callable managed by an exchange." 595
596 - def __init__(self, callable, exchange):
597 598 """ 599 Wrap the given 'callable', using the given 'exchange' to monitor the 600 channels created for communications between this and the created 601 processes. Note that the 'callable' must be parallel-aware (that is, 602 have a 'channel' parameter). Use the MakeParallel class to wrap other 603 kinds of callable objects. 604 """ 605 606 self.callable = callable 607 self.exchange = exchange
608
609 - def __call__(self, *args, **kw):
610 611 "Invoke the callable with the supplied arguments." 612 613 self.exchange.start(self.callable, *args, **kw)
614
615 -class PersistentCallable:
616 617 "A callable which sets up a persistent communications channel." 618
619 - def __init__(self, address, callable, exchange):
620 621 """ 622 Using the given 'address', wrap the given 'callable', using the given 623 'exchange' to monitor the channels created for communications between 624 this and the created processes, so that when it is called, a background 625 process is started within which the 'callable' will run. Note that the 626 'callable' must be parallel-aware (that is, have a 'channel' parameter). 627 Use the MakeParallel class to wrap other kinds of callable objects. 628 """ 629 630 self.callable = callable 631 self.exchange = exchange 632 self.address = address
633
634 - def __call__(self, *args, **kw):
635 636 "Invoke the callable with the supplied arguments." 637 638 self.exchange.start(self.address, self.callable, *args, **kw)
639
640 -class BackgroundCallable:
641 642 """ 643 A callable which sets up a persistent communications channel, but is 644 unmanaged by an exchange. 645 """ 646
647 - def __init__(self, address, callable):
648 649 """ 650 Using the given 'address', wrap the given 'callable'. This object can 651 then be invoked, but the wrapped callable will be run in a background 652 process. Note that the 'callable' must be parallel-aware (that is, have 653 a 'channel' parameter). Use the MakeParallel class to wrap other kinds 654 of callable objects. 655 """ 656 657 self.callable = callable 658 self.address = address
659
660 - def __call__(self, *args, **kw):
661 662 "Invoke the callable with the supplied arguments." 663 664 start_persistent(self.address, self.callable, *args, **kw)
665 666 # Abstractions and utilities. 667
668 -class Map(Exchange):
669 670 "An exchange which can be used like the built-in 'map' function." 671
672 - def __init__(self, *args, **kw):
673 Exchange.__init__(self, *args, **kw) 674 self.init()
675
676 - def init(self):
677 678 "Remember the channel addition order to order output." 679 680 self.channel_number = 0 681 self.channels = {} 682 self.results = [] 683 self.current_index = 0
684
685 - def add(self, channel):
686 687 "Add the given 'channel' to the exchange." 688 689 Exchange.add(self, channel) 690 self.channels[channel] = self.channel_number 691 self.channel_number += 1
692
693 - def start(self, callable, *args, **kw):
694 695 """ 696 Create a new process for the given 'callable' using any additional 697 arguments provided. Then, monitor the channel created between this 698 process and the created process. 699 """ 700 701 self.results.append(Undefined) # placeholder 702 Exchange.start(self, callable, *args, **kw)
703
704 - def create(self):
705 706 """ 707 Create a new process and return the created communications channel to 708 the created process. In the creating process, return None - the channel 709 receiving data from the created process will be automatically managed by 710 this exchange. 711 """ 712 713 self.results.append(Undefined) # placeholder 714 return Exchange.create(self)
715
716 - def __call__(self, callable, sequence):
717 718 "Wrap and invoke 'callable' for each element in the 'sequence'." 719 720 if not isinstance(callable, MakeParallel): 721 wrapped = MakeParallel(callable) 722 else: 723 wrapped = callable 724 725 self.init() 726 727 # Start processes for each element in the sequence. 728 729 for i in sequence: 730 self.start(wrapped, i) 731 732 # Access to the results occurs through this object. 733 734 return self
735
736 - def store_data(self, channel):
737 738 "Accumulate the incoming data, associating results with channels." 739 740 data = channel.receive() 741 self.results[self.channels[channel]] = data 742 del self.channels[channel]
743
744 - def __iter__(self):
745 return self
746
747 - def next(self):
748 749 "Return the next element in the map." 750 751 try: 752 return self._next() 753 except IndexError: 754 pass 755 756 while self.unfinished(): 757 self.store() 758 try: 759 return self._next() 760 except IndexError: 761 pass 762 else: 763 raise StopIteration
764
765 - def __getitem__(self, i):
766 767 "Return element 'i' from the map." 768 769 try: 770 return self._get(i) 771 except IndexError: 772 pass 773 774 while self.unfinished(): 775 self.store() 776 try: 777 return self._get(i) 778 except IndexError: 779 pass 780 else: 781 raise IndexError, i
782 783 # Helper methods for the above access methods. 784
785 - def _next(self):
786 result = self._get(self.current_index) 787 self.current_index += 1 788 return result
789
790 - def _get(self, i):
791 result = self.results[i] 792 if result is Undefined or isinstance(i, slice) and Undefined in result: 793 raise IndexError, i 794 return result
795
796 -class Queue(Exchange):
797 798 """ 799 An exchange acting as a queue, making data from created processes available 800 in the order in which it is received. 801 """ 802
803 - def __init__(self, *args, **kw):
804 Exchange.__init__(self, *args, **kw) 805 self.queue = []
806
807 - def store_data(self, channel):
808 809 "Accumulate the incoming data, associating results with channels." 810 811 data = channel.receive() 812 self.queue.insert(0, data)
813
814 - def __iter__(self):
815 return self
816
817 - def next(self):
818 819 "Return the next element in the queue." 820 821 if self.queue: 822 return self.queue.pop() 823 824 while self.unfinished(): 825 self.store() 826 if self.queue: 827 return self.queue.pop() 828 else: 829 raise StopIteration
830
831 - def __len__(self):
832 833 "Return the current length of the queue." 834 835 return len(self.queue)
836
837 -class MakeParallel:
838 839 "A wrapper around functions making them able to communicate results." 840
841 - def __init__(self, callable):
842 843 """ 844 Initialise the wrapper with the given 'callable'. This object will then 845 be able to accept a 'channel' parameter when invoked, and to forward the 846 result of the given 'callable' via the channel provided back to the 847 invoking process. 848 """ 849 850 self.callable = callable
851
852 - def __call__(self, channel, *args, **kw):
853 854 "Invoke the callable and return its result via the given 'channel'." 855 856 channel.send(self.callable(*args, **kw))
857
858 -class MakeReusable(MakeParallel):
859 860 """ 861 A wrapper around functions making them able to communicate results in a 862 reusable fashion. 863 """ 864
865 - def __call__(self, channel, *args, **kw):
866 867 "Invoke the callable and return its result via the given 'channel'." 868 869 channel.send(self.callable(*args, **kw)) 870 t = channel.receive() 871 while t is not None: 872 args, kw = t 873 channel.send(self.callable(*args, **kw)) 874 t = channel.receive()
875 876 # Persistent variants. 877
878 -class PersistentExchange(Persistent, Exchange):
879 880 "An exchange which manages persistent communications." 881 882 pass
883
884 -class PersistentQueue(Persistent, Queue):
885 886 "A queue which manages persistent communications." 887 888 pass
889 890 # Convenience functions. 891
892 -def BackgroundQueue(address):
893 894 """ 895 Connect to a process reachable via the given 'address', making the results 896 of which accessible via a queue. 897 """ 898 899 queue = PersistentQueue(limit=1) 900 queue.connect(address) 901 return queue
902
903 -def pmap(callable, sequence, limit=None):
904 905 """ 906 A parallel version of the built-in map function with an optional process 907 'limit'. The given 'callable' should not be parallel-aware (that is, have a 908 'channel' parameter) since it will be wrapped for parallel communications 909 before being invoked. 910 911 Return the processed 'sequence' where each element in the sequence is 912 processed by a different process. 913 """ 914 915 mymap = Map(limit=limit) 916 return mymap(callable, sequence)
917 918 # Utility functions. 919 920 _cpuinfo_fields = "processor", "physical id", "core id" 921
922 -def _get_number_of_cores():
923 924 """ 925 Return the number of distinct, genuine processor cores. If the platform is 926 not supported by this function, None is returned. 927 """ 928 929 try: 930 f = open("/proc/cpuinfo") 931 try: 932 processors = set() 933 934 # Use the _cpuinfo_field values as "digits" in a larger unique 935 # core identifier. 936 937 processor = [None, None, None] 938 939 for line in f.xreadlines(): 940 for i, field in enumerate(_cpuinfo_fields): 941 942 # Where the field is found, insert the value into the 943 # appropriate location in the processor identifier. 944 945 if line.startswith(field): 946 t = line.split(":") 947 processor[i] = int(t[1].strip()) 948 break 949 950 # Where a new processor description is started, record the 951 # identifier. 952 953 if line.startswith("processor") and processor[0] is not None: 954 processors.add(tuple(processor)) 955 processor = [None, None, None] 956 957 # At the end of reading the file, add any unrecorded processors. 958 959 if processor[0] is not None: 960 processors.add(tuple(processor)) 961 962 return len(processors) 963 964 finally: 965 f.close() 966 967 except OSError: 968 return None
969
970 -def _get_number_of_cores_solaris():
971 972 """ 973 Return the number of cores for OpenSolaris 2008.05 and possibly other 974 editions of Solaris. 975 """ 976 977 f = os.popen("psrinfo -p") 978 try: 979 return int(f.read().strip()) 980 finally: 981 f.close()
982 983 # Low-level functions. 984
985 -def create_socketpair():
986 987 """ 988 Create a new process, returning a communications channel to both the 989 creating process and the created process. 990 """ 991 992 parent, child = socket.socketpair() 993 for s in [parent, child]: 994 s.setblocking(1) 995 996 pid = os.fork() 997 if pid == 0: 998 parent.close() 999 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0)) 1000 else: 1001 child.close() 1002 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1003
1004 -def create_pipes():
1005 1006 """ 1007 Create a new process, returning a communications channel to both the 1008 creating process and the created process. 1009 1010 This function uses pipes instead of a socket pair, since some platforms 1011 seem to have problems with poll and such socket pairs. 1012 """ 1013 1014 pr, cw = os.pipe() 1015 cr, pw = os.pipe() 1016 1017 pid = os.fork() 1018 if pid == 0: 1019 os.close(pr) 1020 os.close(pw) 1021 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0)) 1022 else: 1023 os.close(cr) 1024 os.close(cw) 1025 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
1026 1027 if platform.system() == "SunOS": 1028 create = create_pipes 1029 get_number_of_cores = _get_number_of_cores_solaris 1030 else: 1031 create = create_socketpair 1032 get_number_of_cores = _get_number_of_cores 1033
1034 -def create_persistent(address):
1035 1036 """ 1037 Create a new process, returning a persistent communications channel between 1038 the creating process and the created process. This channel can be 1039 disconnected from the creating process and connected to another process, and 1040 thus can be used to collect results from daemon processes. 1041 1042 In order to be able to reconnect to created processes, the 'address' of the 1043 communications endpoint for the created process needs to be provided. This 1044 should be a filename. 1045 """ 1046 1047 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1048 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1049 child.bind(address) 1050 1051 for s in [parent, child]: 1052 s.setblocking(1) 1053 1054 pid = os.fork() 1055 if pid == 0: 1056 parent.close() 1057 return PersistentChannel(pid, child, address) 1058 else: 1059 child.close() 1060 #parent.connect(address) 1061 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
1062
1063 -def connect_persistent(address):
1064 1065 """ 1066 Connect via a persistent channel to an existing created process, reachable 1067 at the given 'address'. 1068 """ 1069 1070 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1071 parent.setblocking(1) 1072 parent.connect(address) 1073 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
1074
1075 -def exit(channel):
1076 1077 """ 1078 Terminate a created process, closing the given 'channel'. 1079 """ 1080 1081 channel.close() 1082 os._exit(0)
1083
1084 -def start(callable, *args, **kw):
1085 1086 """ 1087 Create a new process which shall start running in the given 'callable'. 1088 Additional arguments to the 'callable' can be given as additional arguments 1089 to this function. 1090 1091 Return a communications channel to the creating process. For the created 1092 process, supply a channel as the 'channel' parameter in the given 'callable' 1093 so that it may send data back to the creating process. 1094 """ 1095 1096 channel = create() 1097 if channel.pid == 0: 1098 try: 1099 try: 1100 callable(channel, *args, **kw) 1101 except: 1102 exc_type, exc_value, exc_traceback = sys.exc_info() 1103 channel.send(exc_value) 1104 finally: 1105 exit(channel) 1106 else: 1107 return channel
1108
1109 -def start_persistent(address, callable, *args, **kw):
1110 1111 """ 1112 Create a new process which shall be reachable using the given 'address' and 1113 which will start running in the given 'callable'. Additional arguments to 1114 the 'callable' can be given as additional arguments to this function. 1115 1116 Return a communications channel to the creating process. For the created 1117 process, supply a channel as the 'channel' parameter in the given 'callable' 1118 so that it may send data back to the creating process. 1119 1120 Note that the created process employs a channel which is persistent: it can 1121 withstand disconnection from the creating process and subsequent connections 1122 from other processes. 1123 """ 1124 1125 channel = create_persistent(address) 1126 if channel.pid == 0: 1127 close_streams() 1128 try: 1129 try: 1130 callable(channel, *args, **kw) 1131 except: 1132 exc_type, exc_value, exc_traceback = sys.exc_info() 1133 channel.send(exc_value) 1134 finally: 1135 exit(channel) 1136 else: 1137 return channel
1138
1139 -def close_streams():
1140 1141 """ 1142 Close streams which keep the current process attached to any creating 1143 processes. 1144 """ 1145 1146 os.close(sys.stdin.fileno()) 1147 os.close(sys.stdout.fileno()) 1148 os.close(sys.stderr.fileno())
1149
1150 -def waitall():
1151 1152 "Wait for all created processes to terminate." 1153 1154 try: 1155 while 1: 1156 os.wait() 1157 except OSError: 1158 pass
1159 1160 # vim: tabstop=4 expandtab shiftwidth=4 1161