Class Jabber::Stream
In: lib/xmpp4r/stream.rb
Parent: Object
X XDelay XMuc XRoster XMucUser REXML::Element XRosterItem IqQuery XMLStanza IqVcard DiscoIdentity XMucUserItem DiscoItem Error RosterItem DiscoFeature IqQueryRoster IqQueryVersion IqQueryDiscoItems IqQueryDiscoInfo Message Presence Iq Singleton IdGenerator Connection Client Component Comparable JID RuntimeError ErrorException AuthenticationFailure RosterItem Stream StreamParser Roster Vcard Version lib/xmpp4r/authenticationfailure.rb lib/xmpp4r/iq/query/roster.rb lib/xmpp4r/idgenerator.rb lib/xmpp4r/iq/query/version.rb lib/xmpp4r/connection.rb lib/xmpp4r/x/mucuseritem.rb lib/xmpp4r/x/roster.rb lib/xmpp4r/iq.rb lib/xmpp4r/jid.rb lib/xmpp4r/iq/query.rb lib/xmpp4r/xmlstanza.rb lib/xmpp4r/x/delay.rb lib/xmpp4r/errorexception.rb lib/xmpp4r/client.rb lib/xmpp4r/stream.rb lib/xmpp4r/x/muc.rb lib/xmpp4r/streamparser.rb lib/xmpp4r/x.rb lib/xmpp4r/iq/vcard.rb lib/xmpp4r/iq/query/discoinfo.rb lib/xmpp4r/error.rb lib/xmpp4r/component.rb lib/xmpp4r/message.rb lib/xmpp4r/iq/query/discoitems.rb lib/xmpp4r/presence.rb lib/xmpp4r/helpers/roster.rb lib/xmpp4r/helpers/vcard.rb lib/xmpp4r/helpers/version.rb Helpers Jabber Module: Jabber

The stream class manages a connection stream (a file descriptor using which XML messages are read and sent)

Methods

Constants

DISCONNECTED = 1
CONNECTED = 2

Attributes

fd  [R]  file descriptor used
status  [R]  connection status

Public Class methods

Create a new stream (just initializes)

[Source]

    # File lib/xmpp4r/stream.rb, line 34
34:     def initialize(threaded = true)
35:       @fd = nil
36:       @status = DISCONNECTED
37:       @xmlcbs = CallbackList::new
38:       @stanzacbs = CallbackList::new
39:       @messagecbs = CallbackList::new
40:       @iqcbs = CallbackList::new
41:       @presencecbs = CallbackList::new
42:       @threaded = threaded
43:       @StanzaQueue = []
44:       @StanzaQueueMutex = Mutex::new
45:       @exception_block = nil
46:       @threadBlocks = {}
47: #      @pollCounter = 10
48:       @waitingThread = nil
49:       @wakeupThread = nil
50:       @streamid = nil
51:     end

Public Instance methods

Adds a callback block/proc to process received Iqs

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 410
410:     def add_iq_callback(priority = 0, ref = nil, proc=nil, &block)
411:       block = proc if proc
412:       @iqcbs.add(priority, ref, block)
413:     end

Adds a callback block/proc to process received Messages

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 350
350:     def add_message_callback(priority = 0, ref = nil, proc=nil, &block)
351:       block = proc if proc
352:       @messagecbs.add(priority, ref, block)
353:     end

Adds a callback block/proc to process received Presences

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 390
390:     def add_presence_callback(priority = 0, ref = nil, proc=nil, &block)
391:       block = proc if proc
392:       @presencecbs.add(priority, ref, block)
393:     end

Adds a callback block/proc to process received Stanzas

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 370
370:     def add_stanza_callback(priority = 0, ref = nil, proc=nil, &block)
371:       block = proc if proc
372:       @stanzacbs.add(priority, ref, block)
373:     end

Adds a callback block/proc to process received XML messages

priority:[Integer] The callback’s priority, the higher, the sooner
ref:[String] The callback’s reference
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 330
330:     def add_xml_callback(priority = 0, ref = nil, proc=nil, &block)
331:       block = proc if proc
332:       @xmlcbs.add(priority, ref, block)
333:     end

Closes the connection to the Jabber service

[Source]

     # File lib/xmpp4r/stream.rb, line 425
425:     def close
426:       @parserThread.kill if @parserThread
427: #      @pollThread.kill
428:       @fd.close if @fd
429:       @status = DISCONNECTED
430:     end

Delete an Iq callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 420
420:     def delete_iq_callback(ref)
421:       @iqcbs.delete(ref)
422:     end

Delete an Message callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 359
359:     def delete_message_callback(ref)
360:       @messagecbs.delete(ref)
361:     end

Delete a Presence callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 399
399:     def delete_presence_callback(ref)
400:       @presencecbs.delete(ref)
401:     end

Delete a Stanza callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 379
379:     def delete_stanza_callback(ref)
380:       @stanzacbs.delete(ref)
381:     end

Delete an XML-messages callback

ref:[String] The reference of the callback to delete

[Source]

     # File lib/xmpp4r/stream.rb, line 339
339:     def delete_xml_callback(ref)
340:       @xmlcbs.delete(ref)
341:     end

Returns if this connection is connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 111
111:     def is_connected?
112:       return @status == CONNECTED
113:     end

Returns if this connection is NOT connected to a Jabber service

return:[Boolean] Connection status

[Source]

     # File lib/xmpp4r/stream.rb, line 119
119:     def is_disconnected?
120:       return @status == DISCONNECTED
121:     end

Mounts a block to handle exceptions if they occur during the poll send. This will likely be the first indication that the socket dropped in a Jabber Session.

[Source]

    # File lib/xmpp4r/stream.rb, line 88
88:     def on_exception(&block)
89:       @exception_block = block
90:     end

This method is called by the parser when a failure occurs

[Source]

     # File lib/xmpp4r/stream.rb, line 94
 94:     def parse_failure
 95:       # A new thread has to be created because close will cause the thread
 96:       # to commit suicide
 97:       if @exception_block
 98:         Thread.new { @exception_block.call($!, self, :parser) }
 99:       else
100:         puts "Stream#parse_failure was called by XML parser. Dumping " +
101:         "backtrace...\n" + $!.exception + "\n"
102:         puts $!.backtrace
103:         close
104:         raise
105:       end
106:     end

Starts a polling thread to send "keep alive" data to prevent the Jabber connection from closing for inactivity.

Currently not working!

[Source]

     # File lib/xmpp4r/stream.rb, line 307
307:     def poll
308:       sleep 10
309:       while true
310:         sleep 2
311: #        @pollCounter = @pollCounter - 1
312: #        if @pollCounter < 0
313: #          begin
314: #            send("  \t  ")
315: #          rescue
316: #            Thread.new {@exception_block.call if @exception_block}
317: #            break
318: #          end
319: #        end
320:       end
321:     end

Process |max| XML stanzas and call listeners for all of them.

max:[Integer] the number of stanzas to process (nil means process

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 187
187:     def process(max = nil)
188:       n = 0
189:       @StanzaQueueMutex.lock
190:       while @StanzaQueue.size > 0 and (max == nil or n < max)
191:         e = @StanzaQueue.shift
192:         @StanzaQueueMutex.unlock
193:         process_one(e)
194:         n += 1
195:         @StanzaQueueMutex.lock
196:       end
197:       @StanzaQueueMutex.unlock
198:       n
199:     end

Processes a received REXML::Element and executes registered thread blocks and filters against it.

element:[REXML::Element] The received element

[Source]

     # File lib/xmpp4r/stream.rb, line 128
128:     def receive(element)
129:       Jabber::debuglog("RECEIVED:\n#{element.to_s}")
130:       case element.name
131:       when 'stream'
132:         stanza = element
133:         i = element.attribute("id")
134:         @streamid = i.value if i
135:       when 'message'
136:         stanza = Message::import(element)
137:       when 'iq'
138:         stanza = Iq::import(element)
139:       when 'presence'
140:         stanza = Presence::import(element)
141:       else
142:         stanza = element
143:       end
144:       # Iterate through blocked theads (= waiting for an answer)
145:       @threadBlocks.each { |thread, proc|
146:         r = proc.call(stanza)
147:         if r == true
148:           @threadBlocks.delete(thread)
149:           thread.wakeup if thread.alive?
150:           return
151:         end
152:       }
153:       if @threaded
154:         process_one(stanza)
155:       else
156:         # StanzaQueue will be read when the user call process
157:         @StanzaQueueMutex.lock
158:         @StanzaQueue.push(stanza)
159:         @StanzaQueueMutex.unlock
160:         @waitingThread.wakeup if @waitingThread
161:       end
162:     end

Sends XML data to the socket and (optionally) waits to process received data.

xml:[String] The xml data to send
proc:[Proc = nil] The optional proc
&block:[Block] The optional block

[Source]

     # File lib/xmpp4r/stream.rb, line 243
243:     def send(xml, proc=nil, &block)
244:       Jabber::debuglog("SENDING:\n#{ xml.kind_of?(String) ? xml : xml.to_s }")
245:       xml = xml.to_s if not xml.kind_of? String
246:       block = proc if proc
247:       @threadBlocks[Thread.current]=block if block
248:       Thread.critical = true # we don't want to be interupted before we stop!
249:       begin
250:         @fd << xml
251:         @fd.flush
252:       rescue
253:         if @exception_block 
254:           @exception_block.call($!, self, :sending)
255:         else
256:           puts "Exception caught while sending!"
257:           raise
258:         end
259:       end
260:       Thread.critical = false
261:       # The parser thread might be running this (think of a callback running send())
262:       # If this is the case, we mustn't stop (or we would cause a deadlock)
263:       Thread.stop if block and Thread.current != @parserThread
264:       @pollCounter = 10
265:     end

Send an XMMP stanza with an Jabber::XMLStanza#id. The id will be generated by Jabber::IdGenerator if not already set.

The block will be called once: when receiving a stanza with the same Jabber::XMLStanza#id. It must return true to complete this!

Be aware that if a stanza with type=’error’ is received the function does not yield but raises an ErrorException with the corresponding error element.

xml:[XMLStanza]

[Source]

     # File lib/xmpp4r/stream.rb, line 278
278:     def send_with_id(xml, &block)
279:       if xml.id.nil?
280:         xml.id = Jabber::IdGenerator.instance.generate_id
281:       end
282: 
283:       error = nil
284:       send(xml) do |received|
285:         if received.id == xml.id
286:           if received.type == :error
287:             error = received.error
288:             true
289:           else
290:             yield(received)
291:           end
292:         else
293:           false
294:         end
295:       end
296: 
297:       unless error.nil?
298:         raise ErrorException.new(error)
299:       end
300:     end

Start the XML parser on the fd

[Source]

    # File lib/xmpp4r/stream.rb, line 55
55:     def start(fd)
56:       @fd = fd
57:       @parser = StreamParser.new(@fd, self)
58:       @parserThread = Thread.new do
59:         begin
60:           @parser.parse
61:         rescue
62:           if @exception_block
63:             Thread.new { @exception_block.call($!, self, :start) }
64:           else
65:             puts "Exception caught in Parser thread!"
66:             raise
67:           end
68:         end
69:       end
70: #      @pollThread = Thread.new do
71: #        begin
72: #        poll
73: #        rescue
74: #          puts "Exception caught in Poll thread, dumping backtrace and" +
75: #            " exiting...\n" + $!.exception + "\n"
76: #          puts $!.backtrace
77: #          exit
78: #        end
79: #      end
80:       @status = CONNECTED
81:     end

Process an XML stanza and call the listeners for it. If no stanza is currently available, wait for max |time| seconds before returning.

time:[Integer] time to wait in seconds. If nil, wait infinitely.

all available)

[Source]

     # File lib/xmpp4r/stream.rb, line 207
207:     def wait_and_process(time = nil)
208:       if time == 0 
209:         return process(1)
210:       end
211:       @StanzaQueueMutex.lock
212:       if @StanzaQueue.size > 0
213:         e = @StanzaQueue.shift
214:         @StanzaQueueMutex.unlock
215:         process_one(e)
216:         return 1
217:       end
218: 
219:       @waitingThread = Thread.current
220:       @wakeupThread = Thread.new { sleep time ; @waitingThread.wakeup if @waitingThread }
221:       @waitingThread.stop
222:       @wakeupThread.kill if @wakeupThread
223:       @wakeupThread = nil
224:       @waitingThread = nil
225: 
226:       @StanzaQueueMutex.lock
227:       if @StanzaQueue.size > 0
228:         e = @StanzaQueue.shift
229:         @StanzaQueueMutex.unlock
230:         process_one(e)
231:         return 1
232:       end
233:       return 0
234:     end

Private Instance methods

Process |element| until it is consumed. Returns element.consumed? element The element to process

[Source]

     # File lib/xmpp4r/stream.rb, line 167
167:     def process_one(stanza)
168:       Jabber::debuglog("PROCESSING:\n#{stanza.to_s}")
169:       return true if @xmlcbs.process(stanza)
170:       return true if @stanzacbs.process(stanza)
171:       case stanza
172:       when Message
173:         return true if @messagecbs.process(stanza)
174:       when Iq
175:         return true if @iqcbs.process(stanza)
176:       when Presence
177:         return true if @presencecbs.process(stanza)
178:       end
179:     end

[Validate]