1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """Core XMPP stream functionality.
20
21 Normative reference:
22 - `RFC 3920 <http://www.ietf.org/rfc/rfc3920.txt>`__
23 """
24
25 __revision__="$Id: streambase.py 700 2010-04-03 15:34:59Z jajcus $"
26 __docformat__="restructuredtext en"
27
28 import libxml2
29 import socket
30 import os
31 import time
32 import random
33 import threading
34 import errno
35 import logging
36
37
38 from pyxmpp import xmlextra
39 from pyxmpp.expdict import ExpiringDictionary
40 from pyxmpp.utils import to_utf8
41 from pyxmpp.stanza import Stanza
42 from pyxmpp.error import StreamErrorNode
43 from pyxmpp.iq import Iq
44 from pyxmpp.presence import Presence
45 from pyxmpp.message import Message
46 from pyxmpp.jid import JID
47 from pyxmpp import resolver
48 from pyxmpp.stanzaprocessor import StanzaProcessor
49 from pyxmpp.exceptions import StreamError, StreamEncryptionRequired, HostMismatch, ProtocolError
50 from pyxmpp.exceptions import FatalStreamError, StreamParseError, StreamAuthenticationError
51
52 STREAM_NS="http://etherx.jabber.org/streams"
53 BIND_NS="urn:ietf:params:xml:ns:xmpp-bind"
54
65
66 -class StreamBase(StanzaProcessor,xmlextra.StreamHandler):
67 """Base class for a generic XMPP stream.
68
69 Responsible for establishing connection, parsing the stream, dispatching
70 received stanzas to apopriate handlers and sending application's stanzas.
71 This doesn't provide any authentication or encryption (both required by
72 the XMPP specification) and is not usable on its own.
73
74 Whenever we say "stream" here we actually mean two streams
75 (incoming and outgoing) of one connections, as defined by the XMPP
76 specification.
77
78 :Ivariables:
79 - `lock`: RLock object used to synchronize access to Stream object.
80 - `features`: stream features as annouced by the initiator.
81 - `me`: local stream endpoint JID.
82 - `peer`: remote stream endpoint JID.
83 - `process_all_stanzas`: when `True` then all stanzas received are
84 considered local.
85 - `initiator`: `True` if local stream endpoint is the initiating entity.
86 - `owner`: `Client`, `Component` or similar object "owning" this stream.
87 - `_reader`: the stream reader object (push parser) for the stream.
88 """
89 - def __init__(self, default_ns, extra_ns = (), keepalive = 0, owner = None):
90 """Initialize Stream object
91
92 :Parameters:
93 - `default_ns`: stream's default namespace ("jabber:client" for
94 client, "jabber:server" for server, etc.)
95 - `extra_ns`: sequence of extra namespace URIs to be defined for
96 the stream.
97 - `keepalive`: keepalive output interval. 0 to disable.
98 - `owner`: `Client`, `Component` or similar object "owning" this stream.
99 """
100 StanzaProcessor.__init__(self)
101 xmlextra.StreamHandler.__init__(self)
102 self.default_ns_uri=default_ns
103 if extra_ns:
104 self.extra_ns_uris=extra_ns
105 else:
106 self.extra_ns_uris=[]
107 self.keepalive=keepalive
108 self._reader_lock=threading.Lock()
109 self.process_all_stanzas=False
110 self.port=None
111 self._reset()
112 self.owner = owner
113 self.__logger=logging.getLogger("pyxmpp.Stream")
114
116 """Reset `Stream` object state making it ready to handle new
117 connections."""
118 self.doc_in=None
119 self.doc_out=None
120 self.socket=None
121 self._reader=None
122 self.addr=None
123 self.default_ns=None
124 self.extra_ns={}
125 self.stream_ns=None
126 self._reader=None
127 self.ioreader=None
128 self.me=None
129 self.peer=None
130 self.skip=False
131 self.stream_id=None
132 self._iq_response_handlers=ExpiringDictionary()
133 self._iq_get_handlers={}
134 self._iq_set_handlers={}
135 self._message_handlers=[]
136 self._presence_handlers=[]
137 self.eof=False
138 self.initiator=None
139 self.features=None
140 self.authenticated=False
141 self.peer_authenticated=False
142 self.auth_method_used=None
143 self.version=None
144 self.last_keepalive=False
145
148
150 """Initialize stream on outgoing connection.
151
152 :Parameters:
153 - `sock`: connected socket for the stream
154 - `to`: name of the remote host
155 """
156 self.eof=0
157 self.socket=sock
158 if to:
159 self.peer=JID(to)
160 else:
161 self.peer=None
162 self.initiator=1
163 self._send_stream_start()
164 self._make_reader()
165
166 - def connect(self,addr,port,service=None,to=None):
167 """Establish XMPP connection with given address.
168
169 [initiating entity only]
170
171 :Parameters:
172 - `addr`: peer name or IP address
173 - `port`: port number to connect to
174 - `service`: service name (to be resolved using SRV DNS records)
175 - `to`: peer name if different than `addr`
176 """
177 self.lock.acquire()
178 try:
179 return self._connect(addr,port,service,to)
180 finally:
181 self.lock.release()
182
183 - def _connect(self,addr,port,service=None,to=None):
184 """Same as `Stream.connect` but assume `self.lock` is acquired."""
185 if to is None:
186 to=str(addr)
187 if service is not None:
188 self.state_change("resolving srv",(addr,service))
189 addrs=resolver.resolve_srv(addr,service)
190 if not addrs:
191 addrs=[(addr,port)]
192 else:
193 addrs=[(addr,port)]
194 msg=None
195 for addr,port in addrs:
196 if type(addr) in (str, unicode):
197 self.state_change("resolving",addr)
198 s=None
199 for res in resolver.getaddrinfo(addr,port,0,socket.SOCK_STREAM):
200 family, socktype, proto, _unused, sockaddr = res
201 try:
202 s=socket.socket(family,socktype,proto)
203 self.state_change("connecting",sockaddr)
204 s.connect(sockaddr)
205 self.state_change("connected",sockaddr)
206 except socket.error, msg:
207 self.__logger.debug("Connect to %r failed" % (sockaddr,))
208 if s:
209 s.close()
210 s=None
211 continue
212 break
213 if s:
214 break
215 if not s:
216 if msg:
217 raise socket.error, msg
218 else:
219 raise FatalStreamError,"Cannot connect"
220
221 self.addr=addr
222 self.port=port
223 self._connect_socket(s,to)
224 self.last_keepalive=time.time()
225
226 - def accept(self,sock,myname):
227 """Accept incoming connection.
228
229 [receiving entity only]
230
231 :Parameters:
232 - `sock`: a listening socket.
233 - `myname`: local stream endpoint name."""
234 self.lock.acquire()
235 try:
236 return self._accept(sock,myname)
237 finally:
238 self.lock.release()
239
241 """Same as `Stream.accept` but assume `self.lock` is acquired."""
242 self.eof=0
243 self.socket,addr=sock.accept()
244 self.__logger.debug("Connection from: %r" % (addr,))
245 self.addr,self.port=addr
246 if myname:
247 self.me=JID(myname)
248 else:
249 self.me=None
250 self.initiator=0
251 self._make_reader()
252 self.last_keepalive=time.time()
253
255 """Gracefully close the connection."""
256 self.lock.acquire()
257 try:
258 return self._disconnect()
259 finally:
260 self.lock.release()
261
263 """Same as `Stream.disconnect` but assume `self.lock` is acquired."""
264 if self.doc_out:
265 self._send_stream_end()
266
267 - def _post_connect(self):
268 """Called when connection is established.
269
270 This method is supposed to be overriden in derived classes."""
271 pass
272
273 - def _post_auth(self):
274 """Called when connection is authenticated.
275
276 This method is supposed to be overriden in derived classes."""
277 pass
278
280 """Called when connection state is changed.
281
282 This method is supposed to be overriden in derived classes
283 or replaced by an application.
284
285 It may be used to display the connection progress."""
286 self.__logger.debug("State: %s: %r" % (state,arg))
287
289 """Forcibly close the connection and clear the stream state."""
290 self.lock.acquire()
291 try:
292 return self._close()
293 finally:
294 self.lock.release()
295
297 """Same as `Stream.close` but assume `self.lock` is acquired."""
298 self._disconnect()
299 if self.doc_in:
300 self.doc_in=None
301 if self.features:
302 self.features=None
303 self._reader=None
304 self.stream_id=None
305 if self.socket:
306 self.socket.close()
307 self._reset()
308
310 """Create ne `xmlextra.StreamReader` instace as `self._reader`."""
311 self._reader=xmlextra.StreamReader(self)
312
314 """Process <stream:stream> (stream start) tag received from peer.
315
316 :Parameters:
317 - `doc`: document created by the parser"""
318 self.doc_in=doc
319 self.__logger.debug("input document: %r" % (self.doc_in.serialize(),))
320
321 try:
322 r=self.doc_in.getRootElement()
323 if r.ns().getContent() != STREAM_NS:
324 self._send_stream_error("invalid-namespace")
325 raise FatalStreamError,"Invalid namespace."
326 except libxml2.treeError:
327 self._send_stream_error("invalid-namespace")
328 raise FatalStreamError,"Couldn't get the namespace."
329
330 self.version=r.prop("version")
331 if self.version and self.version!="1.0":
332 self._send_stream_error("unsupported-version")
333 raise FatalStreamError,"Unsupported protocol version."
334
335 to_from_mismatch=0
336 if self.initiator:
337 self.stream_id=r.prop("id")
338 peer=r.prop("from")
339 if peer:
340 peer=JID(peer)
341 if self.peer:
342 if peer and peer!=self.peer:
343 self.__logger.debug("peer hostname mismatch:"
344 " %r != %r" % (peer,self.peer))
345 to_from_mismatch=1
346 else:
347 self.peer=peer
348 else:
349 to=r.prop("to")
350 if to:
351 to=self.check_to(to)
352 if not to:
353 self._send_stream_error("host-unknown")
354 raise FatalStreamError,'Bad "to"'
355 self.me=JID(to)
356 self._send_stream_start(self.generate_id())
357 self._send_stream_features()
358 self.state_change("fully connected",self.peer)
359 self._post_connect()
360
361 if not self.version:
362 self.state_change("fully connected",self.peer)
363 self._post_connect()
364
365 if to_from_mismatch:
366 raise HostMismatch
367
369 """Process </stream:stream> (stream end) tag received from peer.
370
371 :Parameters:
372 - `_unused`: document created by the parser"""
373 self.__logger.debug("Stream ended")
374 self.eof=1
375 if self.doc_out:
376 self._send_stream_end()
377 if self.doc_in:
378 self.doc_in=None
379 self._reader=None
380 if self.features:
381 self.features=None
382 self.state_change("disconnected",self.peer)
383
385 """Process stanza (first level child element of the stream) start tag
386 -- do nothing.
387
388 :Parameters:
389 - `doc`: parsed document
390 - `node`: stanza's full XML
391 """
392 pass
393
394 - def stanza(self, _unused, node):
395 """Process stanza (first level child element of the stream).
396
397 :Parameters:
398 - `_unused`: parsed document
399 - `node`: stanza's full XML
400 """
401 self._process_node(node)
402
404 """Handle stream XML parse error.
405
406 :Parameters:
407 - `descr`: error description
408 """
409 raise StreamParseError,descr
410
412 """Send stream end tag."""
413 self.doc_out.getRootElement().addContent(" ")
414 s=self.doc_out.getRootElement().serialize(encoding="UTF-8")
415 end=s.rindex("<")
416 try:
417 self._write_raw(s[end:])
418 except (IOError,SystemError,socket.error),e:
419 self.__logger.debug("Sending stream closing tag failed:"+str(e))
420 self.doc_out.freeDoc()
421 self.doc_out=None
422 if self.features:
423 self.features=None
424
426 """Send stream start tag."""
427 if self.doc_out:
428 raise StreamError,"Stream start already sent"
429 self.doc_out=libxml2.newDoc("1.0")
430 root=self.doc_out.newChild(None, "stream", None)
431 self.stream_ns=root.newNs(STREAM_NS,"stream")
432 root.setNs(self.stream_ns)
433 self.default_ns=root.newNs(self.default_ns_uri,None)
434 for prefix,uri in self.extra_ns:
435 self.extra_ns[uri]=root.newNs(uri,prefix)
436 if self.peer and self.initiator:
437 root.setProp("to",self.peer.as_utf8())
438 if self.me and not self.initiator:
439 root.setProp("from",self.me.as_utf8())
440 root.setProp("version","1.0")
441 if sid:
442 root.setProp("id",sid)
443 self.stream_id=sid
444 sr=self.doc_out.serialize(encoding="UTF-8")
445 self._write_raw(sr[:sr.find("/>")]+">")
446
460
462 """Restart the stream as needed after SASL and StartTLS negotiation."""
463 self._reader=None
464
465 self.doc_out=None
466
467
468 self.doc_in=None
469 self.features=None
470 if self.initiator:
471 self._send_stream_start(self.stream_id)
472 self._make_reader()
473
475 """Create the <features/> element for the stream.
476
477 [receving entity only]
478
479 :returns: new <features/> element node."""
480 root=self.doc_out.getRootElement()
481 features=root.newChild(root.ns(),"features",None)
482 return features
483
490
492 """Write raw data to the stream socket.
493
494 :Parameters:
495 - `data`: data to send"""
496 self.lock.acquire()
497 try:
498 return self._write_raw(data)
499 finally:
500 self.lock.release()
501
503 """Same as `Stream.write_raw` but assume `self.lock` is acquired."""
504 logging.getLogger("pyxmpp.Stream.out").debug("OUT: %r",data)
505 try:
506 self.socket.send(data)
507 except (IOError,OSError,socket.error),e:
508 raise FatalStreamError("IO Error: "+str(e))
509
511 """Write XML `xmlnode` to the stream.
512
513 :Parameters:
514 - `xmlnode`: XML node to send."""
515 if self.eof or not self.socket or not self.doc_out:
516 self.__logger.debug("Dropping stanza: %r" % (xmlnode,))
517 return
518 xmlnode=xmlnode.docCopyNode(self.doc_out,1)
519 self.doc_out.addChild(xmlnode)
520 try:
521 ns = xmlnode.ns()
522 except libxml2.treeError:
523 ns = None
524 if ns and ns.content == xmlextra.COMMON_NS:
525 xmlextra.replace_ns(xmlnode, ns, self.default_ns)
526 s = xmlextra.safe_serialize(xmlnode)
527 self._write_raw(s)
528 xmlnode.unlinkNode()
529 xmlnode.freeNode()
530
531 - def send(self,stanza):
532 """Write stanza to the stream.
533
534 :Parameters:
535 - `stanza`: XMPP stanza to send."""
536 self.lock.acquire()
537 try:
538 return self._send(stanza)
539 finally:
540 self.lock.release()
541
553
555 """Do some housekeeping (cache expiration, timeout handling).
556
557 This method should be called periodically from the application's
558 main loop."""
559 self.lock.acquire()
560 try:
561 return self._idle()
562 finally:
563 self.lock.release()
564
566 """Same as `Stream.idle` but assume `self.lock` is acquired."""
567 self._iq_response_handlers.expire()
568 if not self.socket or self.eof:
569 return
570 now=time.time()
571 if self.keepalive and now-self.last_keepalive>=self.keepalive:
572 self._write_raw(" ")
573 self.last_keepalive=now
574
576 """Return filedescriptor of the stream socket."""
577 self.lock.acquire()
578 try:
579 return self.socket.fileno()
580 finally:
581 self.lock.release()
582
583 - def loop(self,timeout):
584 """Simple "main loop" for the stream."""
585 self.lock.acquire()
586 try:
587 while not self.eof and self.socket is not None:
588 act=self._loop_iter(timeout)
589 if not act:
590 self._idle()
591 finally:
592 self.lock.release()
593
595 """Single iteration of a simple "main loop" for the stream."""
596 self.lock.acquire()
597 try:
598 return self._loop_iter(timeout)
599 finally:
600 self.lock.release()
601
603 """Same as `Stream.loop_iter` but assume `self.lock` is acquired."""
604 import select
605 self.lock.release()
606 try:
607 if not self.socket:
608 time.sleep(timeout)
609 return False
610 try:
611 ifd, _unused, efd = select.select( [self.socket], [], [self.socket], timeout )
612 except select.error,e:
613 if e.args[0]!=errno.EINTR:
614 raise
615 ifd, _unused, efd=[], [], []
616 finally:
617 self.lock.acquire()
618 if self.socket in ifd or self.socket in efd:
619 self._process()
620 return True
621 else:
622 return False
623
625 """Process stream's pending events.
626
627 Should be called whenever there is input available
628 on `self.fileno()` socket descriptor. Is called by
629 `self.loop_iter`."""
630 self.lock.acquire()
631 try:
632 self._process()
633 finally:
634 self.lock.release()
635
637 """Same as `Stream.process` but assume `self.lock` is acquired."""
638 try:
639 try:
640 self._read()
641 except (xmlextra.error,),e:
642 self.__logger.exception("Exception during read()")
643 raise StreamParseError(unicode(e))
644 except:
645 raise
646 except (IOError,OSError,socket.error),e:
647 self.close()
648 raise FatalStreamError("IO Error: "+str(e))
649 except (FatalStreamError,KeyboardInterrupt,SystemExit),e:
650 self.close()
651 raise
652
654 """Read data pending on the stream socket and pass it to the parser."""
655 self.__logger.debug("StreamBase._read(), socket: %r",self.socket)
656 if self.eof:
657 return
658 try:
659 r=self.socket.recv(1024)
660 except socket.error,e:
661 if e.args[0]!=errno.EINTR:
662 raise
663 return
664 self._feed_reader(r)
665
667 """Feed the stream reader with data received.
668
669 If `data` is None or empty, then stream end (peer disconnected) is
670 assumed and the stream is closed.
671
672 :Parameters:
673 - `data`: data received from the stream socket.
674 :Types:
675 - `data`: `unicode`
676 """
677 logging.getLogger("pyxmpp.Stream.in").debug("IN: %r",data)
678 if data:
679 try:
680 r=self._reader.feed(data)
681 while r:
682 r=self._reader.feed("")
683 if r is None:
684 self.eof=1
685 self.disconnect()
686 except StreamParseError:
687 self._send_stream_error("xml-not-well-formed")
688 raise
689 else:
690 self.eof=1
691 self.disconnect()
692 if self.eof:
693 self.stream_end(None)
694
696 """Process first level element of the stream.
697
698 The element may be stream error or features, StartTLS
699 request/response, SASL request/response or a stanza.
700
701 :Parameters:
702 - `xmlnode`: XML node describing the element
703 """
704 ns_uri=xmlnode.ns().getContent()
705 if ns_uri=="http://etherx.jabber.org/streams":
706 self._process_stream_node(xmlnode)
707 return
708
709 if ns_uri==self.default_ns_uri:
710 stanza=stanza_factory(xmlnode, self)
711 self.lock.release()
712 try:
713 self.process_stanza(stanza)
714 finally:
715 self.lock.acquire()
716 stanza.free()
717 else:
718 self.__logger.debug("Unhandled node: %r" % (xmlnode.serialize(),))
719
721 """Process first level stream-namespaced element of the stream.
722
723 The element may be stream error or stream features.
724
725 :Parameters:
726 - `xmlnode`: XML node describing the element
727 """
728 if xmlnode.name=="error":
729 e=StreamErrorNode(xmlnode)
730 self.lock.release()
731 try:
732 self.process_stream_error(e)
733 finally:
734 self.lock.acquire()
735 e.free()
736 return
737 elif xmlnode.name=="features":
738 self.__logger.debug("Got stream features")
739 self.__logger.debug("Node: %r" % (xmlnode,))
740 self.features=xmlnode.copyNode(1)
741 self.doc_in.addChild(self.features)
742 self._got_features()
743 return
744
745 self.__logger.debug("Unhandled stream node: %r" % (xmlnode.serialize(),))
746
748 """Process stream error element received.
749
750 :Types:
751 - `err`: `StreamErrorNode`
752
753 :Parameters:
754 - `err`: error received
755 """
756
757 self.__logger.debug("Unhandled stream error: condition: %s %r"
758 % (err.get_condition().name,err.serialize()))
759
761 """Check "to" attribute of received stream header.
762
763 :return: `to` if it is equal to `self.me`, None otherwise.
764
765 Should be overriden in derived classes which require other logic
766 for handling that attribute."""
767 if to!=self.me:
768 return None
769 return to
770
772 """Generate a random and unique stream ID.
773
774 :return: the id string generated."""
775 return "%i-%i-%s" % (os.getpid(),time.time(),str(random.random())[2:])
776
778 """Process incoming <stream:features/> element.
779
780 [initiating entity only]
781
782 The received features node is available in `self.features`."""
783 ctxt = self.doc_in.xpathNewContext()
784 ctxt.setContextNode(self.features)
785 ctxt.xpathRegisterNs("stream",STREAM_NS)
786 ctxt.xpathRegisterNs("bind",BIND_NS)
787 bind_n=None
788 try:
789 bind_n=ctxt.xpathEval("bind:bind")
790 finally:
791 ctxt.xpathFreeContext()
792
793 if self.authenticated:
794 if bind_n:
795 self.bind(self.me.resource)
796 else:
797 self.state_change("authorized",self.me)
798
799 - def bind(self,resource):
818
820 """Handle resource binding success.
821
822 [initiating entity only]
823
824 :Parameters:
825 - `stanza`: <iq type="result"/> stanza received.
826
827 Set `self.me` to the full JID negotiated."""
828 jid_n=stanza.xpath_eval("bind:bind/bind:jid",{"bind":BIND_NS})
829 if jid_n:
830 self.me=JID(jid_n[0].getContent().decode("utf-8"))
831 self.state_change("authorized",self.me)
832
834 """Handle resource binding success.
835
836 [initiating entity only]
837
838 :raise FatalStreamError:"""
839 raise FatalStreamError,"Resource binding failed"
840
842 """Check if stream is connected.
843
844 :return: True if stream connection is active."""
845 if self.doc_in and self.doc_out and not self.eof:
846 return True
847 else:
848 return False
849
850
851