Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object
Message Presence XMPPStanza Iq XMPPElement X IqQuery Error Singleton IdGenerator Connection Client Component Client Comparable JID RuntimeError ErrorException AuthenticationFailure NoNameXmlnsRegistered SOCKS5Error REXML::Element Stream SOCKS5Bytestreams SOCKS5BytestreamsTarget SOCKS5BytestreamsInitiator XMPPElement StreamHost IqSiFileRange IqSiFile StreamHostUsed IqSi IqFeature XRosterItem RosterItem XMUCUserItem XMUCUserInvite IqPubSub Items Subscription IqPubSubOwner Item Event Feature Identity Item XDataField XDataReported XDataTitle XDataInstructions IqVcard SOCKS5BytestreamsServerStreamHost TCPSocket SOCKS5Socket IqQuery IqQueryBytestreams IqQueryVersion IqQueryRoster IqQueryRPC IqQueryMUCOwner IqQueryDiscoItems IqQueryDiscoInfo IBB IBBTarget IBBInitiator Responder SimpleResponder Iq IqCommand RosterXItem XRoster RosterX X XMUCUser XMUC XDelay XData XMLRPC::ParserWriterChooseMixin Client Server XMLRPC::ParseContentType XMLRPC::BasicServer XParent MUCClient SimpleMUCClient Base DigestMD5 Plain FileSource ServiceHelper NodeHelper CallbackList Callback Semaphore StreamParser SOCKS5BytestreamsPeer SOCKS5BytestreamsServer IBBQueueItem Responder Helper MUCBrowser Helper NodeBrowser Helper lib/xmpp4r/authenticationfailure.rb lib/xmpp4r/xmppstanza.rb lib/xmpp4r/callbacks.rb lib/xmpp4r/idgenerator.rb lib/xmpp4r/connection.rb lib/xmpp4r/iq.rb lib/xmpp4r/jid.rb lib/xmpp4r/errorexception.rb lib/xmpp4r/semaphore.rb lib/xmpp4r/client.rb lib/xmpp4r/stream.rb lib/xmpp4r/x.rb lib/xmpp4r/streamparser.rb lib/xmpp4r/error.rb lib/xmpp4r/component.rb lib/xmpp4r/query.rb lib/xmpp4r/xmppelement.rb lib/xmpp4r/message.rb lib/xmpp4r/presence.rb lib/xmpp4r/bytestreams/helper/ibb/initiator.rb lib/xmpp4r/bytestreams/iq/si.rb lib/xmpp4r/bytestreams/iq/bytestreams.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/base.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/server.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/target.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/socks5.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/initiator.rb lib/xmpp4r/bytestreams/helper/ibb/base.rb lib/xmpp4r/bytestreams/helper/ibb/target.rb Bytestreams XParent lib/xmpp4r/version/iq/version.rb lib/xmpp4r/version/helper/responder.rb lib/xmpp4r/version/helper/simpleresponder.rb Version lib/xmpp4r/command/iq/command.rb lib/xmpp4r/command/helper/responder.rb Command lib/xmpp4r/feature_negotiation/iq/feature.rb FeatureNegotiation lib/xmpp4r/roster/helper/roster.rb lib/xmpp4r/roster/iq/roster.rb lib/xmpp4r/roster/x/roster.rb Roster lib/xmpp4r/rpc/helper/server.rb lib/xmpp4r/rpc/helper/client.rb lib/xmpp4r/rpc/iq/rpc.rb RPC lib/xmpp4r/muc/x/muc.rb lib/xmpp4r/muc/helper/mucclient.rb lib/xmpp4r/muc/x/mucuseritem.rb lib/xmpp4r/muc/helper/mucbrowser.rb lib/xmpp4r/muc/x/mucuserinvite.rb lib/xmpp4r/muc/iq/mucowner.rb lib/xmpp4r/muc/helper/simplemucclient.rb MUC lib/xmpp4r/sasl.rb SASL lib/xmpp4r/bytestreams/helper/filetransfer.rb TransferSource FileTransfer lib/xmpp4r/delay/x/delay.rb Delay lib/xmpp4r/pubsub/stanzas/subscription.rb lib/xmpp4r/pubsub/helper/servicehelper.rb lib/xmpp4r/pubsub/stanzas/item.rb lib/xmpp4r/pubsub/helper/nodehelper.rb lib/xmpp4r/pubsub/iq/pubsub.rb lib/xmpp4r/pubsub/stanzas/event.rb lib/xmpp4r/pubsub/helper/nodebrowser.rb lib/xmpp4r/pubsub/stanzas/items.rb PubSub lib/xmpp4r/httpbinding/client.rb HTTPBinding lib/xmpp4r/discovery/iq/discoinfo.rb lib/xmpp4r/discovery/iq/discoitems.rb Discovery lib/xmpp4r/dataforms/x/data.rb Dataforms lib/xmpp4r/vcard/helper/vcard.rb lib/xmpp4r/vcard/iq/vcard.rb Vcard Jabber dot/m_81_0.png

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Methods

Classes and Modules

Class Jabber::Stream::ThreadBlock

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 42
42:     def initialize(threaded = true)
43:       unless threaded
44:         raise "Non-threaded mode was removed from XMPP4R."
45:       end
46:       @fd = nil
47:       @status = DISCONNECTED
48:       @xmlcbs = CallbackList::new
49:       @stanzacbs = CallbackList::new
50:       @messagecbs = CallbackList::new
51:       @iqcbs = CallbackList::new
52:       @presencecbs = CallbackList::new
53:       @send_lock = Mutex.new
54:       @last_send = Time.now
55:       @exception_block = nil
56:       @threadblocks = []
57:       @wakeup_thread = nil
58:       @streamid = nil
59:       @streamns = 'jabber:client'
60:       @features_sem = Semaphore.new
61:       @parser_thread = nil
62:     end

Public Instance methods

Adds a callback block to process received Iqs

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 478
478:     def add_iq_callback(priority = 0, ref = nil, &block)
479:       @iqcbs.add(priority, ref, block)
480:     end

Adds a callback block to process received Messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 424
424:     def add_message_callback(priority = 0, ref = nil, &block)
425:       @messagecbs.add(priority, ref, block)
426:     end

Adds a callback block to process received Presences

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 460
460:     def add_presence_callback(priority = 0, ref = nil, &block)
461:       @presencecbs.add(priority, ref, block)
462:     end

Adds a callback block to process received Stanzas

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 442
442:     def add_stanza_callback(priority = 0, ref = nil, &block)
443:       @stanzacbs.add(priority, ref, block)
444:     end

Adds a callback block to process received XML messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 406
406:     def add_xml_callback(priority = 0, ref = nil, &block)
407:       @xmlcbs.add(priority, ref, block)
408:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 492
492:     def close
493:       close!
494:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 496
496:     def close!
497:       @parser_thread.kill if @parser_thread
498:       @fd.close if @fd and !@fd.closed?
499:       @status = DISCONNECTED
500:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 487
487:     def delete_iq_callback(ref)
488:       @iqcbs.delete(ref)
489:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 432
432:     def delete_message_callback(ref)
433:       @messagecbs.delete(ref)
434:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 468
468:     def delete_presence_callback(ref)
469:       @presencecbs.delete(ref)
470:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 450
450:     def delete_stanza_callback(ref)
451:       @stanzacbs.delete(ref)
452:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 414
414:     def delete_xml_callback(ref)
415:       @xmlcbs.delete(ref)
416:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 164
164:     def is_connected?
165:       return @status == CONNECTED
166:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 172
172:     def is_disconnected?
173:       return @status == DISCONNECTED
174:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception
  • the Jabber::Stream object (self)
  • a symbol where it happened, namely :start, :parser, :sending and :end

[Source]

     # File lib/xmpp4r/stream.rb, line 120
120:     def on_exception(&block)
121:       @exception_block = block
122:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 126
126:     def parse_failure(e)
127:       Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
128: 
129:       # A new thread has to be created because close will cause the thread
130:       # to commit suicide(???)
131:       if @exception_block
132:         # New thread, because close will kill the current thread
133:         Thread.new do
134:           Thread.current.abort_on_exception = true
135:           close
136:           @exception_block.call(e, self, :parser)
137:         end
138:       else
139:         puts "Stream#parse_failure was called by XML parser. Dumping " +
140:         "backtrace...\n" + e.exception + "\n"
141:         puts e.backtrace
142:         close
143:         raise
144:       end
145:     end

This method is called by the parser upon receiving </stream:stream>

[Source]

     # File lib/xmpp4r/stream.rb, line 149
149:     def parser_end
150:       if @exception_block
151:         Thread.new do
152:           Thread.current.abort_on_exception = true
153:           close
154:           @exception_block.call(nil, self, :close)
155:         end
156:       else
157:         close
158:       end
159:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 181
181:     def receive(element)
182:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
183: 
184:       if element.namespace('').to_s == '' # REXML namespaces are always strings
185:         element.add_namespace(@streamns)
186:       end
187: 
188:       case element.prefix
189:       when 'stream'
190:         case element.name
191:           when 'stream'
192:             stanza = element
193:             @streamid = element.attributes['id']
194:             @streamns = element.namespace('') if element.namespace('')
195: 
196:             # Hack: component streams are basically client streams.
197:             # Someday we may want to create special stanza classes
198:             # for components/s2s deriving from normal stanzas but
199:             # posessing these namespaces
200:             @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'
201: 
202:             unless element.attributes['version']  # isn't XMPP compliant, so
203:               Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
204:               @features_sem.run                   # don't wait for <stream:features/> 
205:             end
206:           when 'features'
207:             stanza = element
208:             element.each { |e|
209:               if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
210:                 e.each_element('mechanism') { |mech|
211:                   @stream_mechanisms.push(mech.text)
212:                 }
213:               else
214:                 @stream_features[e.name] = e.namespace
215:               end
216:             }
217:             Jabber::debuglog("FEATURES: received")
218:             @features_sem.run
219:           else
220:             stanza = element
221:         end
222:       else
223:         # Any stanza, classes are registered by XMPPElement::name_xmlns
224:         begin
225:           stanza = XMPPStanza::import(element)
226:         rescue NoNameXmlnsRegistered
227:           stanza = element
228:         end
229:       end
230: 
231:       # Iterate through blocked threads (= waiting for an answer)
232:       #
233:       # We're dup'ping the @threadblocks here, so that we won't end up in an
234:       # endless loop if Stream#send is being nested. That means, the nested
235:       # threadblock won't receive the stanza currently processed, but the next
236:       # one.
237:       threadblocks = @threadblocks.dup
238:       threadblocks.each { |threadblock|
239:         exception = nil
240:         r = false
241:         begin
242:           r = threadblock.call(stanza)
243:         rescue Exception => e
244:           exception = e
245:         end
246: 
247:         if r == true
248:           @threadblocks.delete(threadblock)
249:           threadblock.wakeup
250:           return
251:         elsif exception
252:           @threadblocks.delete(threadblock)
253:           threadblock.raise(exception)
254:         end
255:       }
256: 
257:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
258:       return true if @xmlcbs.process(stanza)
259:       return true if @stanzacbs.process(stanza)
260:       case stanza
261:       when Message
262:         return true if @messagecbs.process(stanza)
263:       when Iq
264:         return true if @iqcbs.process(stanza)
265:       when Presence
266:         return true if @presencecbs.process(stanza)
267:       end
268:     end

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml:[String] The xml data to send
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 315
315:     def send(xml, &block)
316:       Jabber::debuglog("SENDING:\n#{xml}")
317:       @threadblocks.unshift(threadblock = ThreadBlock.new(block)) if block
318:       begin
319:         # Temporarily remove stanza's namespace to
320:         # reduce bandwidth consumption
321:         if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client'
322:           xml.delete_namespace
323:           send_data(xml.to_s)
324:           xml.add_namespace(@streamns)
325:         else
326:           send_data(xml.to_s)
327:         end
328:       rescue Exception => e
329:         Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
330: 
331:         if @exception_block
332:           Thread.new do
333:             Thread.current.abort_on_exception = true
334:             close!
335:             @exception_block.call(e, self, :sending)
336:           end
337:         else
338:           if Jabber::debug
339:             puts "Exception caught while sending! (#{e.class})"
340:             puts e.backtrace
341:           end
342:           close!
343:           raise
344:         end
345:       end
346:       # The parser thread might be running this (think of a callback running send())
347:       # If this is the case, we mustn't stop (or we would cause a deadlock)
348:       if block and Thread.current != @parser_thread
349:         threadblock.wait
350:       elsif block
351:         Jabber::debuglog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
352:       end
353:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 297
297:     def send_data(data)
298:       @send_lock.synchronize do
299:         @last_send = Time.now
300:         @fd << data
301:         @fd.flush
302:       end
303:     end

Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.

Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.

Please see Stream#send for some implementational details.

Please read the note about nesting at Stream#send

xml:[XMPPStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 372
372:     def send_with_id(xml, &block)
373:       if xml.id.nil?
374:         xml.id = Jabber::IdGenerator.instance.generate_id
375:       end
376: 
377:       res = nil
378:       error = nil
379:       send(xml) do |received|
380:         if received.kind_of? XMPPStanza and received.id == xml.id
381:           if received.type == :error
382:             error = (received.error ? received.error : Error.new)
383:             true
384:           else
385:             res = yield(received)
386:             true
387:           end
388:         else
389:           false
390:         end
391:       end
392: 
393:       unless error.nil?
394:         raise ErrorException.new(error)
395:       end
396: 
397:       res
398:     end

Start the XML parser on the fd

[Source]

     # File lib/xmpp4r/stream.rb, line 66
 66:     def start(fd)
 67:       @stream_mechanisms = []
 68:       @stream_features = {}
 69: 
 70:       @fd = fd
 71:       @parser = StreamParser.new(@fd, self)
 72:       @parser_thread = Thread.new do
 73:         Thread.current.abort_on_exception = true
 74:         begin
 75:           @parser.parse
 76:           Jabber::debuglog("DISCONNECTED\n")
 77: 
 78:           if @exception_block
 79:             Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
 80:           else
 81:             close!
 82:           end
 83:         rescue Exception => e
 84:           Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
 85: 
 86:           if @exception_block
 87:             Thread.new do
 88:               Thread.current.abort_on_exception = true
 89:               close
 90:               @exception_block.call(e, self, :start)
 91:             end
 92:           else
 93:             if Jabber::debug
 94:               puts "Exception caught in Parser thread! (#{e.class})"
 95:               puts e.backtrace
 96:             end
 97:             close!
 98:             raise
 99:           end
100:         end
101:       end
102: 
103:       @status = CONNECTED
104:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 106
106:     def stop
107:       @parser_thread.kill
108:       @parser = nil
109:     end

[Validate]