Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object
Message Presence XMPPStanza Iq Singleton IdGenerator XMPPElement X IqQuery Error Connection Client Component Client Comparable JID RuntimeError AuthenticationFailure ErrorException NoNameXmlnsRegistered SOCKS5Error REXML::Element Stream SOCKS5Bytestreams SOCKS5BytestreamsTarget SOCKS5BytestreamsInitiator XMPPElement StreamHost IqSiFileRange IqSiFile StreamHostUsed IqSi XRosterItem RosterItem IqFeature XMUCUserItem XMUCUserInvite IqPubSub Items Item Event Feature Item Identity XDataField XDataReported XDataTitle XDataInstructions IqVcard SOCKS5BytestreamsServerStreamHost TCPSocket SOCKS5Socket IqQuery IqQueryBytestreams IqQueryVersion IqQueryRoster IqQueryMUCOwner IqQueryRPC IqQueryDiscoItems IqQueryDiscoInfo IBB IBBTarget IBBInitiator Responder SimpleResponder Iq IqCommand RosterXItem XRoster RosterX X XMUCUser XMUC XDelay XData XParent MUCClient SimpleMUCClient XMLRPC::ParserWriterChooseMixin Client Server XMLRPC::ParseContentType XMLRPC::BasicServer Base DigestMD5 Plain ServiceHelper NodeHelper FileSource CallbackList Callback Semaphore StreamParser SOCKS5BytestreamsPeer SOCKS5BytestreamsServer IBBQueueItem Responder Helper MUCBrowser NodeBrowser Helper Helper lib/xmpp4r/authenticationfailure.rb lib/xmpp4r/xmppstanza.rb lib/xmpp4r/callbacks.rb lib/xmpp4r/idgenerator.rb lib/xmpp4r/connection.rb lib/xmpp4r/iq.rb lib/xmpp4r/jid.rb lib/xmpp4r/errorexception.rb lib/xmpp4r/client.rb lib/xmpp4r/stream.rb lib/xmpp4r/semaphore.rb lib/xmpp4r/streamparser.rb lib/xmpp4r/x.rb lib/xmpp4r/error.rb lib/xmpp4r/component.rb lib/xmpp4r/query.rb lib/xmpp4r/xmppelement.rb lib/xmpp4r/message.rb lib/xmpp4r/presence.rb lib/xmpp4r/bytestreams/helper/ibb/initiator.rb lib/xmpp4r/bytestreams/iq/si.rb lib/xmpp4r/bytestreams/iq/bytestreams.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/base.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/server.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/target.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/socks5.rb lib/xmpp4r/bytestreams/helper/socks5bytestreams/initiator.rb lib/xmpp4r/bytestreams/helper/ibb/base.rb lib/xmpp4r/bytestreams/helper/ibb/target.rb Bytestreams XParent lib/xmpp4r/version/iq/version.rb lib/xmpp4r/version/helper/responder.rb lib/xmpp4r/version/helper/simpleresponder.rb Version lib/xmpp4r/command/iq/command.rb lib/xmpp4r/command/helper/responder.rb Command lib/xmpp4r/roster/helper/roster.rb lib/xmpp4r/roster/iq/roster.rb lib/xmpp4r/roster/x/roster.rb Roster lib/xmpp4r/feature_negotiation/iq/feature.rb FeatureNegotiation lib/xmpp4r/muc/x/muc.rb lib/xmpp4r/muc/helper/mucclient.rb lib/xmpp4r/muc/x/mucuseritem.rb lib/xmpp4r/muc/helper/mucbrowser.rb lib/xmpp4r/muc/x/mucuserinvite.rb lib/xmpp4r/muc/iq/mucowner.rb lib/xmpp4r/muc/helper/simplemucclient.rb MUC lib/xmpp4r/rpc/helper/server.rb lib/xmpp4r/rpc/helper/client.rb lib/xmpp4r/rpc/iq/rpc.rb RPC lib/xmpp4r/sasl.rb SASL lib/xmpp4r/delay/x/delay.rb Delay lib/xmpp4r/pubsub/helper/servicehelper.rb lib/xmpp4r/pubsub/stanzas/item.rb lib/xmpp4r/pubsub/helper/nodehelper.rb lib/xmpp4r/pubsub/iq/pubsub.rb lib/xmpp4r/pubsub/stanzas/event.rb lib/xmpp4r/pubsub/helper/nodebrowser.rb lib/xmpp4r/pubsub/stanzas/items.rb PubSub lib/xmpp4r/httpbinding/client.rb HTTPBinding lib/xmpp4r/bytestreams/helper/filetransfer.rb TransferSource FileTransfer lib/xmpp4r/discovery/iq/discoinfo.rb lib/xmpp4r/discovery/iq/discoitems.rb Discovery lib/xmpp4r/dataforms/x/data.rb Dataforms lib/xmpp4r/vcard/helper/vcard.rb lib/xmpp4r/vcard/iq/vcard.rb Vcard Jabber dot/m_79_0.png

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.

To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.

Methods

Classes and Modules

Class Jabber::Stream::ThreadBlock

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 42
42:     def initialize(threaded = true)
43:       unless threaded
44:         raise "Non-threaded mode was removed from XMPP4R."
45:       end
46:       @fd = nil
47:       @status = DISCONNECTED
48:       @xmlcbs = CallbackList::new
49:       @stanzacbs = CallbackList::new
50:       @messagecbs = CallbackList::new
51:       @iqcbs = CallbackList::new
52:       @presencecbs = CallbackList::new
53:       @send_lock = Mutex.new
54:       @last_send = Time.now
55:       @exception_block = nil
56:       @threadblocks = []
57:       @wakeup_thread = nil
58:       @streamid = nil
59:       @streamns = 'jabber:client'
60:       @features_sem = Semaphore.new
61:       @parser_thread = nil
62:     end

Public Instance methods

Adds a callback block to process received Iqs

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 466
466:     def add_iq_callback(priority = 0, ref = nil, &block)
467:       @iqcbs.add(priority, ref, block)
468:     end

Adds a callback block to process received Messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 412
412:     def add_message_callback(priority = 0, ref = nil, &block)
413:       @messagecbs.add(priority, ref, block)
414:     end

Adds a callback block to process received Presences

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 448
448:     def add_presence_callback(priority = 0, ref = nil, &block)
449:       @presencecbs.add(priority, ref, block)
450:     end

Adds a callback block to process received Stanzas

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 430
430:     def add_stanza_callback(priority = 0, ref = nil, &block)
431:       @stanzacbs.add(priority, ref, block)
432:     end

Adds a callback block to process received XML messages

priority:[Integer] The callback‘s priority, the higher, the sooner
ref:[String] The callback‘s reference
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 394
394:     def add_xml_callback(priority = 0, ref = nil, &block)
395:       @xmlcbs.add(priority, ref, block)
396:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 480
480:     def close
481:       close!
482:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 484
484:     def close!
485:       @parser_thread.kill if @parser_thread
486:       @fd.close if @fd and !@fd.closed?
487:       @status = DISCONNECTED
488:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 475
475:     def delete_iq_callback(ref)
476:       @iqcbs.delete(ref)
477:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 420
420:     def delete_message_callback(ref)
421:       @messagecbs.delete(ref)
422:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 456
456:     def delete_presence_callback(ref)
457:       @presencecbs.delete(ref)
458:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 438
438:     def delete_stanza_callback(ref)
439:       @stanzacbs.delete(ref)
440:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 402
402:     def delete_xml_callback(ref)
403:       @xmlcbs.delete(ref)
404:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 155
155:     def is_connected?
156:       return @status == CONNECTED
157:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 163
163:     def is_disconnected?
164:       return @status == DISCONNECTED
165:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

The block has to take three arguments:

  • the Exception
  • the Jabber::Stream object (self)
  • a symbol where it happened, namely :start, :parser, :sending and :end

[Source]

     # File lib/xmpp4r/stream.rb, line 111
111:     def on_exception(&block)
112:       @exception_block = block
113:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 117
117:     def parse_failure(e)
118:       Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
119: 
120:       # A new thread has to be created because close will cause the thread
121:       # to commit suicide(???)
122:       if @exception_block
123:         # New thread, because close will kill the current thread
124:         Thread.new do
125:           Thread.current.abort_on_exception = true
126:           close
127:           @exception_block.call(e, self, :parser)
128:         end
129:       else
130:         puts "Stream#parse_failure was called by XML parser. Dumping " +
131:         "backtrace...\n" + e.exception + "\n"
132:         puts e.backtrace
133:         close
134:         raise
135:       end
136:     end

This method is called by the parser upon receiving </stream:stream>

[Source]

     # File lib/xmpp4r/stream.rb, line 140
140:     def parser_end
141:       if @exception_block
142:         Thread.new do
143:           Thread.current.abort_on_exception = true
144:           close
145:           @exception_block.call(nil, self, :close)
146:         end
147:       else
148:         close
149:       end
150:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 172
172:     def receive(element)
173:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
174: 
175:       if element.namespace('').to_s == '' # REXML namespaces are always strings
176:         element.add_namespace(@streamns)
177:       end
178: 
179:       case element.prefix
180:       when 'stream'
181:         case element.name
182:           when 'stream'
183:             stanza = element
184:             @streamid = element.attributes['id']
185:             @streamns = element.namespace('') if element.namespace('')
186: 
187:             # Hack: component streams are basically client streams.
188:             # Someday we may want to create special stanza classes
189:             # for components/s2s deriving from normal stanzas but
190:             # posessing these namespaces
191:             @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'
192: 
193:             unless element.attributes['version']  # isn't XMPP compliant, so
194:               Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
195:               @features_sem.run                   # don't wait for <stream:features/> 
196:             end
197:           when 'features'
198:             stanza = element
199:             element.each { |e|
200:               if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
201:                 e.each_element('mechanism') { |mech|
202:                   @stream_mechanisms.push(mech.text)
203:                 }
204:               else
205:                 @stream_features[e.name] = e.namespace
206:               end
207:             }
208:             Jabber::debuglog("FEATURES: received")
209:             @features_sem.run
210:           else
211:             stanza = element
212:         end
213:       else
214:         # Any stanza, classes are registered by XMPPElement::name_xmlns
215:         begin
216:           stanza = XMPPStanza::import(element)
217:         rescue NoNameXmlnsRegistered
218:           stanza = element
219:         end
220:       end
221: 
222:       # Iterate through blocked threads (= waiting for an answer)
223:       #
224:       # We're dup'ping the @threadblocks here, so that we won't end up in an
225:       # endless loop if Stream#send is being nested. That means, the nested
226:       # threadblock won't receive the stanza currently processed, but the next
227:       # one.
228:       threadblocks = @threadblocks.dup
229:       threadblocks.each { |threadblock|
230:         exception = nil
231:         r = false
232:         begin
233:           r = threadblock.call(stanza)
234:         rescue Exception => e
235:           exception = e
236:         end
237: 
238:         if r == true
239:           @threadblocks.delete(threadblock)
240:           threadblock.wakeup
241:           return
242:         elsif exception
243:           @threadblocks.delete(threadblock)
244:           threadblock.raise(exception)
245:         end
246:       }
247: 
248:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
249:       return true if @xmlcbs.process(stanza)
250:       return true if @stanzacbs.process(stanza)
251:       case stanza
252:       when Message
253:         return true if @messagecbs.process(stanza)
254:       when Iq
255:         return true if @iqcbs.process(stanza)
256:       when Presence
257:         return true if @presencecbs.process(stanza)
258:       end
259:     end

Sends XML data to the socket and (optionally) waits to process received data.

Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).

xml:[String] The xml data to send
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 306
306:     def send(xml, &block)
307:       Jabber::debuglog("SENDING:\n#{xml}")
308:       @threadblocks.unshift(threadblock = ThreadBlock.new(block)) if block
309:       begin
310:         # Temporarily remove stanza's namespace to
311:         # reduce bandwidth consumption
312:         if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client'
313:           xml.delete_namespace
314:           send_data(xml.to_s)
315:           xml.add_namespace(@streamns)
316:         else
317:           send_data(xml.to_s)
318:         end
319:       rescue Exception => e
320:         Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
321: 
322:         if @exception_block
323:           Thread.new do
324:             Thread.current.abort_on_exception = true
325:             close!
326:             @exception_block.call(e, self, :sending)
327:           end
328:         else
329:           puts "Exception caught while sending!"
330:           close!
331:           raise
332:         end
333:       end
334:       # The parser thread might be running this (think of a callback running send())
335:       # If this is the case, we mustn't stop (or we would cause a deadlock)
336:       if block and Thread.current != @parser_thread
337:         threadblock.wait
338:       elsif block
339:         Jabber::debuglog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
340:       end
341:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 288
288:     def send_data(data)
289:       @send_lock.synchronize do
290:         @last_send = Time.now
291:         @fd << data
292:         @fd.flush
293:       end
294:     end

Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.

Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.

Please see Stream#send for some implementational details.

Please read the note about nesting at Stream#send

xml:[XMPPStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 360
360:     def send_with_id(xml, &block)
361:       if xml.id.nil?
362:         xml.id = Jabber::IdGenerator.instance.generate_id
363:       end
364: 
365:       res = nil
366:       error = nil
367:       send(xml) do |received|
368:         if received.kind_of? XMPPStanza and received.id == xml.id
369:           if received.type == :error
370:             error = (received.error ? received.error : Error.new)
371:             true
372:           else
373:             res = yield(received)
374:             true
375:           end
376:         else
377:           false
378:         end
379:       end
380: 
381:       unless error.nil?
382:         raise ErrorException.new(error)
383:       end
384: 
385:       res
386:     end

Start the XML parser on the fd

[Source]

    # File lib/xmpp4r/stream.rb, line 66
66:     def start(fd)
67:       @stream_mechanisms = []
68:       @stream_features = {}
69: 
70:       @fd = fd
71:       @parser = StreamParser.new(@fd, self)
72:       @parser_thread = Thread.new do
73:         Thread.current.abort_on_exception = true
74:         begin
75:           @parser.parse
76:         rescue Exception => e
77:           Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
78: 
79:           if @exception_block
80:             Thread.new do
81:               Thread.current.abort_on_exception = true
82:               close
83:               @exception_block.call(e, self, :start)
84:             end
85:           else
86:             puts "Exception caught in Parser thread! (#{e.class})"
87:             puts e.backtrace
88:             close
89:             raise
90:           end
91:         end
92:       end
93: 
94:       @status = CONNECTED
95:     end

[Source]

     # File lib/xmpp4r/stream.rb, line 97
 97:     def stop
 98:       @parser_thread.kill
 99:       @parser = nil
100:     end

[Validate]