Class | Jabber::Stream |
In: |
lib/xmpp4r/stream.rb
|
Parent: | Object |
The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)
You may register callbacks for the three Jabber stanzas (message, presence and iq) and use the send and send_with_id methods.
To ensure the order of received stanzas, callback blocks are launched in the parser thread. If further blocking operations are intended in those callbacks, run your own thread there.
DISCONNECTED | = | 1 |
CONNECTED | = | 2 |
fd | [R] | file descriptor used |
status | [R] | connection status |
Create a new stream (just initializes)
# File lib/xmpp4r/stream.rb, line 42 42: def initialize(threaded = true) 43: unless threaded 44: raise "Non-threaded mode was removed from XMPP4R." 45: end 46: @fd = nil 47: @status = DISCONNECTED 48: @xmlcbs = CallbackList::new 49: @stanzacbs = CallbackList::new 50: @messagecbs = CallbackList::new 51: @iqcbs = CallbackList::new 52: @presencecbs = CallbackList::new 53: @send_lock = Mutex.new 54: @last_send = Time.now 55: @exception_block = nil 56: @threadblocks = [] 57: @wakeup_thread = nil 58: @streamid = nil 59: @streamns = 'jabber:client' 60: @features_sem = Semaphore.new 61: @parser_thread = nil 62: end
Adds a callback block to process received Iqs
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 478 478: def add_iq_callback(priority = 0, ref = nil, &block) 479: @iqcbs.add(priority, ref, block) 480: end
Adds a callback block to process received Messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 424 424: def add_message_callback(priority = 0, ref = nil, &block) 425: @messagecbs.add(priority, ref, block) 426: end
Adds a callback block to process received Presences
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 460 460: def add_presence_callback(priority = 0, ref = nil, &block) 461: @presencecbs.add(priority, ref, block) 462: end
Adds a callback block to process received Stanzas
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 442 442: def add_stanza_callback(priority = 0, ref = nil, &block) 443: @stanzacbs.add(priority, ref, block) 444: end
Adds a callback block to process received XML messages
priority: | [Integer] The callback‘s priority, the higher, the sooner |
ref: | [String] The callback‘s reference |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 406 406: def add_xml_callback(priority = 0, ref = nil, &block) 407: @xmlcbs.add(priority, ref, block) 408: end
# File lib/xmpp4r/stream.rb, line 496 496: def close! 497: @parser_thread.kill if @parser_thread 498: @fd.close if @fd and !@fd.closed? 499: @status = DISCONNECTED 500: end
Delete a Stanza callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 450 450: def delete_stanza_callback(ref) 451: @stanzacbs.delete(ref) 452: end
Delete an XML-messages callback
ref: | [String] The reference of the callback to delete |
# File lib/xmpp4r/stream.rb, line 414 414: def delete_xml_callback(ref) 415: @xmlcbs.delete(ref) 416: end
Returns if this connection is connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 164 164: def is_connected? 165: return @status == CONNECTED 166: end
Returns if this connection is NOT connected to a Jabber service
return: | [Boolean] Connection status |
# File lib/xmpp4r/stream.rb, line 172 172: def is_disconnected? 173: return @status == DISCONNECTED 174: end
Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.
The block has to take three arguments:
# File lib/xmpp4r/stream.rb, line 120 120: def on_exception(&block) 121: @exception_block = block 122: end
This method is called by the parser when a failure occurs
# File lib/xmpp4r/stream.rb, line 126 126: def parse_failure(e) 127: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 128: 129: # A new thread has to be created because close will cause the thread 130: # to commit suicide(???) 131: if @exception_block 132: # New thread, because close will kill the current thread 133: Thread.new do 134: Thread.current.abort_on_exception = true 135: close 136: @exception_block.call(e, self, :parser) 137: end 138: else 139: puts "Stream#parse_failure was called by XML parser. Dumping " + 140: "backtrace...\n" + e.exception + "\n" 141: puts e.backtrace 142: close 143: raise 144: end 145: end
This method is called by the parser upon receiving </stream:stream>
# File lib/xmpp4r/stream.rb, line 149 149: def parser_end 150: if @exception_block 151: Thread.new do 152: Thread.current.abort_on_exception = true 153: close 154: @exception_block.call(nil, self, :close) 155: end 156: else 157: close 158: end 159: end
Processes a received REXML::Element and executes registered thread blocks and filters against it.
element: | [REXML::Element] The received element |
# File lib/xmpp4r/stream.rb, line 181 181: def receive(element) 182: Jabber::debuglog("RECEIVED:\n#{element.to_s}") 183: 184: if element.namespace('').to_s == '' # REXML namespaces are always strings 185: element.add_namespace(@streamns) 186: end 187: 188: case element.prefix 189: when 'stream' 190: case element.name 191: when 'stream' 192: stanza = element 193: @streamid = element.attributes['id'] 194: @streamns = element.namespace('') if element.namespace('') 195: 196: # Hack: component streams are basically client streams. 197: # Someday we may want to create special stanza classes 198: # for components/s2s deriving from normal stanzas but 199: # posessing these namespaces 200: @streamns = 'jabber:client' if @streamns == 'jabber:component:accept' 201: 202: unless element.attributes['version'] # isn't XMPP compliant, so 203: Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features") 204: @features_sem.run # don't wait for <stream:features/> 205: end 206: when 'features' 207: stanza = element 208: element.each { |e| 209: if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl' 210: e.each_element('mechanism') { |mech| 211: @stream_mechanisms.push(mech.text) 212: } 213: else 214: @stream_features[e.name] = e.namespace 215: end 216: } 217: Jabber::debuglog("FEATURES: received") 218: @features_sem.run 219: else 220: stanza = element 221: end 222: else 223: # Any stanza, classes are registered by XMPPElement::name_xmlns 224: begin 225: stanza = XMPPStanza::import(element) 226: rescue NoNameXmlnsRegistered 227: stanza = element 228: end 229: end 230: 231: # Iterate through blocked threads (= waiting for an answer) 232: # 233: # We're dup'ping the @threadblocks here, so that we won't end up in an 234: # endless loop if Stream#send is being nested. That means, the nested 235: # threadblock won't receive the stanza currently processed, but the next 236: # one. 237: threadblocks = @threadblocks.dup 238: threadblocks.each { |threadblock| 239: exception = nil 240: r = false 241: begin 242: r = threadblock.call(stanza) 243: rescue Exception => e 244: exception = e 245: end 246: 247: if r == true 248: @threadblocks.delete(threadblock) 249: threadblock.wakeup 250: return 251: elsif exception 252: @threadblocks.delete(threadblock) 253: threadblock.raise(exception) 254: end 255: } 256: 257: Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})") 258: return true if @xmlcbs.process(stanza) 259: return true if @stanzacbs.process(stanza) 260: case stanza 261: when Message 262: return true if @messagecbs.process(stanza) 263: when Iq 264: return true if @iqcbs.process(stanza) 265: when Presence 266: return true if @presencecbs.process(stanza) 267: end 268: end
Sends XML data to the socket and (optionally) waits to process received data.
Do not invoke this in a callback but in a seperate thread because we may not suspend the parser-thread (in whose context callbacks are executed).
xml: | [String] The xml data to send |
&block: | [Block] The optional block |
# File lib/xmpp4r/stream.rb, line 315 315: def send(xml, &block) 316: Jabber::debuglog("SENDING:\n#{xml}") 317: @threadblocks.unshift(threadblock = ThreadBlock.new(block)) if block 318: begin 319: # Temporarily remove stanza's namespace to 320: # reduce bandwidth consumption 321: if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' 322: xml.delete_namespace 323: send_data(xml.to_s) 324: xml.add_namespace(@streamns) 325: else 326: send_data(xml.to_s) 327: end 328: rescue Exception => e 329: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 330: 331: if @exception_block 332: Thread.new do 333: Thread.current.abort_on_exception = true 334: close! 335: @exception_block.call(e, self, :sending) 336: end 337: else 338: if Jabber::debug 339: puts "Exception caught while sending! (#{e.class})" 340: puts e.backtrace 341: end 342: close! 343: raise 344: end 345: end 346: # The parser thread might be running this (think of a callback running send()) 347: # If this is the case, we mustn't stop (or we would cause a deadlock) 348: if block and Thread.current != @parser_thread 349: threadblock.wait 350: elsif block 351: Jabber::debuglog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!") 352: end 353: end
# File lib/xmpp4r/stream.rb, line 297 297: def send_data(data) 298: @send_lock.synchronize do 299: @last_send = Time.now 300: @fd << data 301: @fd.flush 302: end 303: end
Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be generated by Jabber::IdGenerator if not already set.
The block will be called once: when receiving a stanza with the same Jabber::XMPPStanza#id. There is no need to return true to complete this! Instead the return value of the block will be returned.
Be aware that if a stanza with type=‘error‘ is received the function does not yield but raises an ErrorException with the corresponding error element.
Please see Stream#send for some implementational details.
Please read the note about nesting at Stream#send
xml: | [XMPPStanza] |
# File lib/xmpp4r/stream.rb, line 372 372: def send_with_id(xml, &block) 373: if xml.id.nil? 374: xml.id = Jabber::IdGenerator.instance.generate_id 375: end 376: 377: res = nil 378: error = nil 379: send(xml) do |received| 380: if received.kind_of? XMPPStanza and received.id == xml.id 381: if received.type == :error 382: error = (received.error ? received.error : Error.new) 383: true 384: else 385: res = yield(received) 386: true 387: end 388: else 389: false 390: end 391: end 392: 393: unless error.nil? 394: raise ErrorException.new(error) 395: end 396: 397: res 398: end
Start the XML parser on the fd
# File lib/xmpp4r/stream.rb, line 66 66: def start(fd) 67: @stream_mechanisms = [] 68: @stream_features = {} 69: 70: @fd = fd 71: @parser = StreamParser.new(@fd, self) 72: @parser_thread = Thread.new do 73: Thread.current.abort_on_exception = true 74: begin 75: @parser.parse 76: Jabber::debuglog("DISCONNECTED\n") 77: 78: if @exception_block 79: Thread.new { close!; @exception_block.call(nil, self, :disconnected) } 80: else 81: close! 82: end 83: rescue Exception => e 84: Jabber::debuglog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}") 85: 86: if @exception_block 87: Thread.new do 88: Thread.current.abort_on_exception = true 89: close 90: @exception_block.call(e, self, :start) 91: end 92: else 93: if Jabber::debug 94: puts "Exception caught in Parser thread! (#{e.class})" 95: puts e.backtrace 96: end 97: close! 98: raise 99: end 100: end 101: end 102: 103: @status = CONNECTED 104: end