Class | Jabber::Bytestreams::IBB |
In: |
lib/xmpp4r/bytestreams/helper/ibb/base.rb
|
Parent: | Object |
In-Band Bytestreams (JEP-0047) implementation
Don‘t use directly, use IBBInitiator and IBBTarget
In-Band Bytestreams should only be used when transferring very small amounts of binary data, because it is slow and increases server load drastically.
Note that the constructor takes a lot of arguments. In-Band Bytestreams do not specify a way to initiate the stream, this should be done via Stream Initiation.
NS_IBB | = | 'http://jabber.org/protocol/ibb' |
Create a new bytestream
Will register a <message/> callback to intercept data of this stream. This data will be buffered, you can retrieve it with receive
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 30 30: def initialize(stream, session_id, my_jid, peer_jid) 31: @stream = stream 32: @session_id = session_id 33: @my_jid = (my_jid.kind_of?(String) ? JID.new(my_jid) : my_jid) 34: @peer_jid = (peer_jid.kind_of?(String) ? JID.new(peer_jid) : peer_jid) 35: 36: @active = false 37: @seq_send = 0 38: @seq_recv = 0 39: @queue = [] 40: @queue_lock = Mutex.new 41: @pending = Semaphore.new 42: @sendbuf = '' 43: @sendbuf_lock = Mutex.new 44: 45: @block_size = 4096 # Recommended by JEP0047 46: end
Close the stream
Waits for acknowledge from peer, may throw ErrorException
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 128 128: def close 129: if active? 130: flush 131: deactivate 132: 133: iq = Iq.new(:set, @peer_jid) 134: close = iq.add REXML::Element.new('close') 135: close.add_namespace IBB::NS_IBB 136: close.attributes['sid'] = @session_id 137: 138: @stream.send_with_id(iq) { |answer| 139: answer.type == :result 140: } 141: end 142: end
Empty the send-buffer by sending remaining data
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 72 72: def flush 73: @sendbuf_lock.synchronize { 74: while @sendbuf.size > 0 75: send_data(@sendbuf[0..@block_size-1]) 76: @sendbuf = @sendbuf[@block_size..-1].to_s 77: end 78: } 79: end
Receive data
Will wait until the Message with the next sequence number is in the stanza queue.
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 86 86: def read 87: if active? 88: res = nil 89: 90: while res.nil? 91: @queue_lock.synchronize { 92: @queue.each { |item| 93: # Find next data 94: if item.type == :data and item.seq == @seq_recv.to_s 95: res = item 96: break 97: # No data? Find close 98: elsif item.type == :close and res.nil? 99: res = item 100: end 101: } 102: 103: @queue.delete_if { |item| item == res } 104: } 105: 106: # No data? Wait for next to arrive... 107: @pending.wait unless res 108: end 109: 110: if res.type == :data 111: @seq_recv += 1 112: @seq_recv = 0 if @seq_recv > 65535 113: res.data 114: elsif res.type == :close 115: deactivate 116: nil # Closed 117: end 118: else 119: nil 120: end 121: end
Send data
Data is buffered to match block_size in each packet. If you need the data to be sent immediately, use flush afterwards.
buf: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 59 59: def write(buf) 60: @sendbuf_lock.synchronize { 61: @sendbuf += buf 62: 63: while @sendbuf.size >= @block_size 64: send_data(@sendbuf[0..@block_size-1]) 65: @sendbuf = @sendbuf[@block_size..-1].to_s 66: end 67: } 68: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 182 182: def activate 183: unless active? 184: @stream.add_message_callback(200, self) { |msg| 185: data = msg.first_element('data') 186: if msg.from == @peer_jid and msg.to == @my_jid and data and data.attributes['sid'] == @session_id 187: if msg.type == nil 188: @queue_lock.synchronize { 189: @queue.push IBBQueueItem.new(:data, data.attributes['seq'], data.text.to_s) 190: @pending.run 191: } 192: elsif msg.type == :error 193: @queue_lock.synchronize { 194: @queue << IBBQueueItem.new(:close) 195: @pending.run 196: } 197: end 198: true 199: else 200: false 201: end 202: } 203: 204: @stream.add_iq_callback(200, self) { |iq| 205: close = iq.first_element('close') 206: if iq.type == :set and close and close.attributes['sid'] == @session_id 207: answer = iq.answer(false) 208: answer.type = :result 209: @stream.send(answer) 210: 211: @queue_lock.synchronize { 212: @queue << IBBQueueItem.new(:close) 213: @pending.run 214: } 215: true 216: else 217: false 218: end 219: } 220: 221: @active = true 222: end 223: end
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 225 225: def deactivate 226: if active? 227: @stream.delete_message_callback(self) 228: @stream.delete_iq_callback(self) 229: 230: @active = false 231: end 232: end
Send data directly
data: | [String] |
# File lib/xmpp4r/bytestreams/helper/ibb/base.rb, line 149 149: def send_data(databuf) 150: if active? 151: msg = Message.new 152: msg.from = @my_jid 153: msg.to = @peer_jid 154: 155: data = msg.add REXML::Element.new('data') 156: data.add_namespace NS_IBB 157: data.attributes['sid'] = @session_id 158: data.attributes['seq'] = @seq_send.to_s 159: data.text = Base64::encode64 databuf 160: 161: # TODO: Implement AMP correctly 162: amp = msg.add REXML::Element.new('amp') 163: amp.add_namespace 'http://jabber.org/protocol/amp' 164: deliver_at = amp.add REXML::Element.new('rule') 165: deliver_at.attributes['condition'] = 'deliver-at' 166: deliver_at.attributes['value'] = 'stored' 167: deliver_at.attributes['action'] = 'error' 168: match_resource = amp.add REXML::Element.new('rule') 169: match_resource.attributes['condition'] = 'match-resource' 170: match_resource.attributes['value'] = 'exact' 171: match_resource.attributes['action'] = 'error' 172: 173: @stream.send(msg) 174: 175: @seq_send += 1 176: @seq_send = 0 if @seq_send > 65535 177: else 178: raise 'Attempt to send data when not activated' 179: end 180: end