1 """
2 A simple parallel processing API for Python, inspired somewhat by the thread
3 module, slightly less by pypar, and slightly less still by pypvm.
4
5 Copyright (C) 2005, 2006, 2007, 2008 Paul Boddie <paul@boddie.org.uk>
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by the Free
9 Software Foundation; either version 3 of the License, or (at your option) any
10 later version.
11
12 This program is distributed in the hope that it will be useful, but WITHOUT
13 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 details.
16
17 You should have received a copy of the GNU Lesser General Public License along
18 with this program. If not, see <http://www.gnu.org/licenses/>.
19 """
20
21 __version__ = "0.4"
22
23 import os
24 import sys
25 import select
26 import socket
27 import platform
28
29 try:
30 import cPickle as pickle
31 except ImportError:
32 import pickle
33
34 try:
35 set
36 except NameError:
37 from sets import Set as set
38
39
40
43
45
46 "A communications channel."
47
48 - def __init__(self, pid, read_pipe, write_pipe):
49
50 """
51 Initialise the channel with a process identifier 'pid', a 'read_pipe'
52 from which messages will be received, and a 'write_pipe' into which
53 messages will be sent.
54 """
55
56 self.pid = pid
57 self.read_pipe = read_pipe
58 self.write_pipe = write_pipe
59
61
62
63
64
65 self.close()
66
68
69 "Explicitly close the channel."
70
71 if self.read_pipe is not None:
72 self.read_pipe.close()
73 self.read_pipe = None
74 if self.write_pipe is not None:
75 self.write_pipe.close()
76 self.write_pipe = None
77
78
79 - def wait(self, options=0):
80
81 "Wait for the created process, if any, to exit."
82
83 if self.pid != 0:
84 try:
85 os.waitpid(self.pid, options)
86 except OSError:
87 pass
88
90
91 "Send the given object 'obj' through the channel."
92
93 pickle.dump(obj, self.write_pipe)
94 self.write_pipe.flush()
95
96 - def send(self, obj):
97
98 """
99 Send the given object 'obj' through the channel. Then wait for an
100 acknowledgement. (The acknowledgement makes the caller wait, thus
101 preventing processes from exiting and disrupting the communications
102 channel and losing data.)
103 """
104
105 self._send(obj)
106 if self._receive() != "OK":
107 raise AcknowledgementError, obj
108
110
111 "Receive an object through the channel, returning the object."
112
113 obj = pickle.load(self.read_pipe)
114 if isinstance(obj, Exception):
115 raise obj
116 else:
117 return obj
118
120
121 """
122 Receive an object through the channel, returning the object. Send an
123 acknowledgement of receipt. (The acknowledgement makes the sender wait,
124 thus preventing processes from exiting and disrupting the communications
125 channel and losing data.)
126 """
127
128 try:
129 obj = self._receive()
130 return obj
131 finally:
132 self._send("OK")
133
135
136 """
137 A persistent communications channel which can handle peer disconnection,
138 acting as a server, meaning that this channel is associated with a specific
139 address which can be contacted by other processes.
140 """
141
142 - def __init__(self, pid, endpoint, address):
143 Channel.__init__(self, pid, None, None)
144 self.endpoint = endpoint
145 self.address = address
146 self.poller = select.poll()
147
148
149
150
151
152 self.endpoint.listen(1)
153
155
156 "Close the persistent channel and remove the socket file."
157
158 Channel.close(self)
159 try:
160 os.unlink(self.address)
161 except OSError:
162 pass
163
165
166 "Ensure that the channel is capable of communicating."
167
168 if self.read_pipe is None or self.write_pipe is None:
169
170
171
172 endpoint, address = self.endpoint.accept()
173 self.read_pipe = endpoint.makefile("r", 0)
174 self.write_pipe = endpoint.makefile("w", 0)
175
176
177
178 fileno = self.write_pipe.fileno()
179 self.poller.register(fileno, select.POLLOUT | select.POLLHUP | select.POLLNVAL | select.POLLERR)
180
182
183 "Discard the existing connection."
184
185 fileno = self.write_pipe.fileno()
186 self.poller.unregister(fileno)
187 self.read_pipe = None
188 self.write_pipe = None
189 self.endpoint.listen(1)
190
192
193 "Ensure that sending and receiving are possible."
194
195 while 1:
196 self._ensure_pipes()
197 fileno = self.write_pipe.fileno()
198 fds = self.poller.poll(timeout)
199 for fd, status in fds:
200 if fd != fileno:
201 continue
202 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
203
204
205
206 self._reset_pipes()
207 break
208 else:
209 return
210
217
224
225
226
228
229 """
230 A communications exchange that can be used to detect channels which are
231 ready to communicate. Subclasses of this class can define the 'store_data'
232 method in order to enable the 'add_wait', 'wait' and 'finish' methods.
233 """
234
235 - def __init__(self, channels=None, limit=None, reuse=0, autoclose=1):
236
237 """
238 Initialise the exchange with an optional list of 'channels'.
239
240 If the optional 'limit' is specified, restrictions on the addition of
241 new channels can be enforced and observed through the 'add_wait', 'wait'
242 and 'finish' methods. To make use of these methods, create a subclass of
243 this class and define a working 'store_data' method.
244
245 If the optional 'reuse' parameter is set to a true value, channels and
246 processes will be reused for waiting computations.
247
248 If the optional 'autoclose' parameter is set to a false value, channels
249 will not be closed automatically when they are removed from the exchange
250 - by default they are closed when removed.
251 """
252
253 self.limit = limit
254 self.reuse = reuse
255 self.autoclose = autoclose
256 self.waiting = []
257 self.readables = {}
258 self.removed = []
259 self.poller = select.poll()
260 for channel in channels or []:
261 self.add(channel)
262
263 - def add(self, channel):
264
265 "Add the given 'channel' to the exchange."
266
267 fileno = channel.read_pipe.fileno()
268 self.readables[fileno] = channel
269 self.poller.register(fileno, select.POLLIN | select.POLLHUP | select.POLLNVAL | select.POLLERR)
270
272
273 "Return a list of active channels."
274
275 return self.readables.values()
276
277 - def ready(self, timeout=None):
278
279 """
280 Wait for a period of time specified by the optional 'timeout' (or until
281 communication is possible) and return a list of channels which are ready
282 to be read from.
283 """
284
285 fds = self.poller.poll(timeout)
286 readables = []
287 self.removed = []
288
289 for fd, status in fds:
290 channel = self.readables[fd]
291 removed = 0
292
293
294
295 if status & (select.POLLHUP | select.POLLNVAL | select.POLLERR):
296 self.remove(channel)
297 self.removed.append(channel)
298 removed = 1
299
300
301
302 if status & select.POLLIN:
303 if not (removed and self.autoclose):
304 readables.append(channel)
305
306 return readables
307
309
310 """
311 Remove the given 'channel' from the exchange.
312 """
313
314 fileno = channel.read_pipe.fileno()
315 del self.readables[fileno]
316 self.poller.unregister(fileno)
317 if self.autoclose:
318 channel.close()
319 channel.wait()
320
321
322
324
325 """
326 Add the given 'channel' to the exchange, waiting if the limit on active
327 channels would be exceeded by adding the channel.
328 """
329
330 self.wait()
331 self.add(channel)
332
334
335 """
336 Test for the limit on channels, blocking and reading incoming data until
337 the number of channels is below the limit.
338 """
339
340
341
342 while self.limit is not None and len(self.active()) >= self.limit:
343 self.store()
344
346
347 """
348 Get waiting callable and argument information for new processes, given
349 the reception of data on the given 'channel'.
350 """
351
352 if self.waiting:
353 callable, args, kw = self.waiting.pop()
354
355
356
357 if self.reuse:
358
359
360
361
362 self.add(channel)
363 channel.send((args, kw))
364 else:
365 return callable, args, kw
366
367
368
369
370 elif self.reuse:
371 channel.send(None)
372
373 return None
374
376
377 """
378 Finish the use of the exchange by waiting for all channels to complete.
379 """
380
381 while self.active():
382 self.store()
383
385
386 "For each ready channel, process the incoming data."
387
388 for channel in self.ready():
389 self.store_data(channel)
390 self.start_waiting(channel)
391
393
394 """
395 Store incoming data from the specified 'channel'. In subclasses of this
396 class, such data could be stored using instance attributes.
397 """
398
399 raise NotImplementedError, "store_data"
400
401
402
404
405 """
406 Support process creation by returning whether the given 'callable' has
407 been queued for later invocation.
408 """
409
410 if self.limit is not None and len(self.active()) >= self.limit:
411 self.waiting.insert(0, (callable, args, kw))
412 return 1
413 else:
414 return 0
415
417
418 """
419 Support process creation by returning the given 'channel' to the
420 creating process, and None to the created process.
421 """
422
423 if channel.pid == 0:
424 return channel
425 else:
426 self.add_wait(channel)
427 return None
428
429
430
432
433 """
434 Start a waiting process given the reception of data on the given
435 'channel'.
436 """
437
438 details = self._get_waiting(channel)
439 if details is not None:
440 callable, args, kw = details
441 self.add(start(callable, *args, **kw))
442
443
444
445 - def start(self, callable, *args, **kw):
446
447 """
448 Create a new process for the given 'callable' using any additional
449 arguments provided. Then, monitor the channel created between this
450 process and the created process.
451 """
452
453 if self._set_waiting(callable, args, kw):
454 return
455
456 self.add_wait(start(callable, *args, **kw))
457
459
460 """
461 Create a new process and return the created communications channel to
462 the created process. In the creating process, return None - the channel
463 receiving data from the created process will be automatically managed by
464 this exchange.
465 """
466
467 channel = create()
468 return self._get_channel_for_process(channel)
469
471
472 """
473 Wrap the given 'callable' in an object which can then be called in the
474 same way as 'callable', but with new processes and communications
475 managed automatically.
476 """
477
478 return ManagedCallable(callable, self)
479
481
482 """
483 A mix-in class providing methods to exchanges for the management of
484 persistent communications.
485 """
486
488
489 """
490 Start a waiting process given the reception of data on the given
491 'channel'.
492 """
493
494 details = self._get_waiting(channel)
495 if details is not None:
496 callable, args, kw = details
497 self.add(start_persistent(channel.address, callable, *args, **kw))
498
499 - def start(self, address, callable, *args, **kw):
500
501 """
502 Create a new process, located at the given 'address', for the given
503 'callable' using any additional arguments provided. Then, monitor the
504 channel created between this process and the created process.
505 """
506
507 if self._set_waiting(callable, args, kw):
508 return
509
510 start_persistent(address, callable, *args, **kw)
511
513
514 """
515 Create a new process, located at the given 'address', and return the
516 created communications channel to the created process. In the creating
517 process, return None - the channel receiving data from the created
518 process will be automatically managed by this exchange.
519 """
520
521 channel = create_persistent(address)
522 return self._get_channel_for_process(channel)
523
524 - def manage(self, address, callable):
525
526 """
527 Using the given 'address', publish the given 'callable' in an object
528 which can then be called in the same way as 'callable', but with new
529 processes and communications managed automatically.
530 """
531
532 return PersistentCallable(address, callable, self)
533
535
536 "Connect to a process which is contactable via the given 'address'."
537
538 channel = connect_persistent(address)
539 self.add_wait(channel)
540
542
543 "A callable managed by an exchange."
544
545 - def __init__(self, callable, exchange):
546
547 """
548 Wrap the given 'callable', using the given 'exchange' to monitor the
549 channels created for communications between this and the created
550 processes. Note that the 'callable' must be parallel-aware (that is,
551 have a 'channel' parameter). Use the MakeParallel class to wrap other
552 kinds of callable objects.
553 """
554
555 self.callable = callable
556 self.exchange = exchange
557
559
560 "Invoke the callable with the supplied arguments."
561
562 self.exchange.start(self.callable, *args, **kw)
563
565
566 "A callable which sets up a persistent communications channel."
567
568 - def __init__(self, address, callable, exchange):
569
570 """
571 Using the given 'address', wrap the given 'callable', using the given
572 'exchange' to monitor the channels created for communications between
573 this and the created processes, so that when it is called, a background
574 process is started within which the 'callable' will run. Note that the
575 'callable' must be parallel-aware (that is, have a 'channel' parameter).
576 Use the MakeParallel class to wrap other kinds of callable objects.
577 """
578
579 self.callable = callable
580 self.exchange = exchange
581 self.address = address
582
584
585 "Invoke the callable with the supplied arguments."
586
587 self.exchange.start(self.address, self.callable, *args, **kw)
588
590
591 """
592 A callable which sets up a persistent communications channel, but is
593 unmanaged by an exchange.
594 """
595
597
598 """
599 Using the given 'address', wrap the given 'callable'. This object can
600 then be invoked, but the wrapped callable will be run in a background
601 process. Note that the 'callable' must be parallel-aware (that is, have
602 a 'channel' parameter). Use the MakeParallel class to wrap other kinds
603 of callable objects.
604 """
605
606 self.callable = callable
607 self.address = address
608
610
611 "Invoke the callable with the supplied arguments."
612
613 start_persistent(self.address, self.callable, *args, **kw)
614
615
616
617 -class Map(Exchange):
618
619 "An exchange which can be used like the built-in 'map' function."
620
624
626
627 "Remember the channel addition order to order output."
628
629 self.channel_number = 0
630 self.channels = {}
631 self.results = []
632
633 - def add(self, channel):
634
635 "Add the given 'channel' to the exchange."
636
637 Exchange.add(self, channel)
638 self.channels[channel] = self.channel_number
639 self.channel_number += 1
640
641 - def start(self, callable, *args, **kw):
642
643 """
644 Create a new process for the given 'callable' using any additional
645 arguments provided. Then, monitor the channel created between this
646 process and the created process.
647 """
648
649 self.results.append(None)
650 Exchange.start(self, callable, *args, **kw)
651
653
654 """
655 Create a new process and return the created communications channel to
656 the created process. In the creating process, return None - the channel
657 receiving data from the created process will be automatically managed by
658 this exchange.
659 """
660
661 self.results.append(None)
662 return Exchange.create(self)
663
664 - def __call__(self, callable, sequence):
665
666 "Wrap and invoke 'callable' for each element in the 'sequence'."
667
668 if not isinstance(callable, MakeParallel):
669 wrapped = MakeParallel(callable)
670 else:
671 wrapped = callable
672
673 self.init()
674
675
676
677 for i in sequence:
678 self.start(wrapped, i)
679
680
681
682 return self
683
685 self.finish()
686 return self.results[i]
687
689 self.finish()
690 return iter(self.results)
691
693
694 "Accumulate the incoming data, associating results with channels."
695
696 data = channel.receive()
697 self.results[self.channels[channel]] = data
698 del self.channels[channel]
699
701
702 """
703 An exchange acting as a queue, making data from created processes available
704 in the order in which it is received.
705 """
706
710
712
713 "Accumulate the incoming data, associating results with channels."
714
715 data = channel.receive()
716 self.queue.insert(0, data)
717
720
722
723 "Return the next element in the queue."
724
725 if self.queue:
726 return self.queue.pop()
727 while self.active():
728 self.store()
729 if self.queue:
730 return self.queue.pop()
731 else:
732 raise StopIteration
733
735
736 "A wrapper around functions making them able to communicate results."
737
739
740 """
741 Initialise the wrapper with the given 'callable'. This object will then
742 be able to accept a 'channel' parameter when invoked, and to forward the
743 result of the given 'callable' via the channel provided back to the
744 invoking process.
745 """
746
747 self.callable = callable
748
749 - def __call__(self, channel, *args, **kw):
750
751 "Invoke the callable and return its result via the given 'channel'."
752
753 channel.send(self.callable(*args, **kw))
754
756
757 """
758 A wrapper around functions making them able to communicate results in a
759 reusable fashion.
760 """
761
762 - def __call__(self, channel, *args, **kw):
763
764 "Invoke the callable and return its result via the given 'channel'."
765
766 channel.send(self.callable(*args, **kw))
767 t = channel.receive()
768 while t is not None:
769 args, kw = t
770 channel.send(self.callable(*args, **kw))
771 t = channel.receive()
772
773
774
776
777 "An exchange which manages persistent communications."
778
779 pass
780
782
783 "A queue which manages persistent communications."
784
785 pass
786
787
788
790
791 """
792 Connect to a process reachable via the given 'address', making the results
793 of which accessible via a queue.
794 """
795
796 queue = PersistentQueue(limit=1)
797 queue.connect(address)
798 return queue
799
800 -def pmap(callable, sequence, limit=None):
801
802 """
803 A parallel version of the built-in map function with an optional process
804 'limit'. The given 'callable' should not be parallel-aware (that is, have a
805 'channel' parameter) since it will be wrapped for parallel communications
806 before being invoked.
807
808 Return the processed 'sequence' where each element in the sequence is
809 processed by a different process.
810 """
811
812 mymap = Map(limit=limit)
813 return mymap(callable, sequence)
814
815
816
817 _cpuinfo_fields = "physical id", "core id"
818
820
821 """
822 Return the number of distinct, genuine processor cores. If the platform is
823 not supported by this function, None is returned.
824 """
825
826 try:
827 f = open("/proc/cpuinfo")
828 try:
829 processors = set()
830 processor = [None, None]
831
832 for line in f.xreadlines():
833 for i, field in enumerate(_cpuinfo_fields):
834 if line.startswith(field):
835 t = line.split(":")
836 processor[i] = int(t[1].strip())
837 break
838 else:
839 if line.startswith("processor") and processor[0] is not None:
840 processors.add(tuple(processor))
841 processor = [None, None]
842
843 if processor[0] is not None:
844 processors.add(tuple(processor))
845
846 return len(processors)
847
848 finally:
849 f.close()
850
851 except OSError:
852 return None
853
855
856 """
857 Return the number of cores for OpenSolaris 2008.05 and possibly other
858 editions of Solaris.
859 """
860
861 f = os.popen("psrinfo -p")
862 try:
863 return int(f.read().strip())
864 finally:
865 f.close()
866
867
868
870
871 """
872 Create a new process, returning a communications channel to both the
873 creating process and the created process.
874 """
875
876 parent, child = socket.socketpair()
877 for s in [parent, child]:
878 s.setblocking(1)
879
880 pid = os.fork()
881 if pid == 0:
882 parent.close()
883 return Channel(pid, child.makefile("r", 0), child.makefile("w", 0))
884 else:
885 child.close()
886 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
887
889
890 """
891 Create a new process, returning a communications channel to both the
892 creating process and the created process.
893
894 This function uses pipes instead of a socket pair, since some platforms
895 seem to have problems with poll and such socket pairs.
896 """
897
898 pr, cw = os.pipe()
899 cr, pw = os.pipe()
900
901 pid = os.fork()
902 if pid == 0:
903 os.close(pr)
904 os.close(pw)
905 return Channel(pid, os.fdopen(cr, "r", 0), os.fdopen(cw, "w", 0))
906 else:
907 os.close(cr)
908 os.close(cw)
909 return Channel(pid, os.fdopen(pr, "r", 0), os.fdopen(pw, "w", 0))
910
911 if platform.system() == "SunOS":
912 create = create_pipes
913 get_number_of_cores = _get_number_of_cores_solaris
914 else:
915 create = create_socketpair
916 get_number_of_cores = _get_number_of_cores
917
919
920 """
921 Create a new process, returning a persistent communications channel between
922 the creating process and the created process. This channel can be
923 disconnected from the creating process and connected to another process, and
924 thus can be used to collect results from daemon processes.
925
926 In order to be able to reconnect to created processes, the 'address' of the
927 communications endpoint for the created process needs to be provided. This
928 should be a filename.
929 """
930
931 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
932 child = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
933 child.bind(address)
934
935 for s in [parent, child]:
936 s.setblocking(1)
937
938 pid = os.fork()
939 if pid == 0:
940 parent.close()
941 return PersistentChannel(pid, child, address)
942 else:
943 child.close()
944
945 return Channel(pid, parent.makefile("r", 0), parent.makefile("w", 0))
946
948
949 """
950 Connect via a persistent channel to an existing created process, reachable
951 at the given 'address'.
952 """
953
954 parent = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
955 parent.setblocking(1)
956 parent.connect(address)
957 return Channel(0, parent.makefile("r", 0), parent.makefile("w", 0))
958
960
961 """
962 Terminate a created process, closing the given 'channel'.
963 """
964
965 channel.close()
966 os._exit(0)
967
968 -def start(callable, *args, **kw):
969
970 """
971 Create a new process which shall start running in the given 'callable'.
972 Additional arguments to the 'callable' can be given as additional arguments
973 to this function.
974
975 Return a communications channel to the creating process. For the created
976 process, supply a channel as the 'channel' parameter in the given 'callable'
977 so that it may send data back to the creating process.
978 """
979
980 channel = create()
981 if channel.pid == 0:
982 try:
983 try:
984 callable(channel, *args, **kw)
985 except:
986 exc_type, exc_value, exc_traceback = sys.exc_info()
987 channel.send(exc_value)
988 finally:
989 exit(channel)
990 else:
991 return channel
992
994
995 """
996 Create a new process which shall be reachable using the given 'address' and
997 which will start running in the given 'callable'. Additional arguments to
998 the 'callable' can be given as additional arguments to this function.
999
1000 Return a communications channel to the creating process. For the created
1001 process, supply a channel as the 'channel' parameter in the given 'callable'
1002 so that it may send data back to the creating process.
1003
1004 Note that the created process employs a channel which is persistent: it can
1005 withstand disconnection from the creating process and subsequent connections
1006 from other processes.
1007 """
1008
1009 channel = create_persistent(address)
1010 if channel.pid == 0:
1011 close_streams()
1012 try:
1013 try:
1014 callable(channel, *args, **kw)
1015 except:
1016 exc_type, exc_value, exc_traceback = sys.exc_info()
1017 channel.send(exc_value)
1018 finally:
1019 exit(channel)
1020 else:
1021 return channel
1022
1024
1025 """
1026 Close streams which keep the current process attached to any creating
1027 processes.
1028 """
1029
1030 os.close(sys.stdin.fileno())
1031 os.close(sys.stdout.fileno())
1032 os.close(sys.stderr.fileno())
1033
1035
1036 "Wait for all created processes to terminate."
1037
1038 try:
1039 while 1:
1040 os.wait()
1041 except OSError:
1042 pass
1043
1044
1045