Class MCollective::Connector::Stomp
In: plugins/mcollective/connector/stomp.rb
Parent: Base

Handles sending and receiving messages over the Stomp protocol

This plugin supports version 1.1 or 1.1.6 and newer of the Stomp rubygem the versions between those had multi threading issues.

For all versions you can configure it as follows:

   connector = stomp
   plugin.stomp.host = stomp.your.net
   plugin.stomp.port = 6163
   plugin.stomp.user = you
   plugin.stomp.password = secret

All of these can be overriden per user using environment variables:

   STOMP_SERVER, STOMP_PORT, STOMP_USER, STOMP_PASSWORD

Version 1.1.6 onward support supplying multiple connections and it will do failover between these servers, you can configure it as follows:

    connector = stomp
    plugin.stomp.pool.size = 2

    plugin.stomp.pool.host1 = stomp1.your.net
    plugin.stomp.pool.port1 = 6163
    plugin.stomp.pool.user1 = you
    plugin.stomp.pool.password1 = secret
    plugin.stomp.pool.ssl1 = true

    plugin.stomp.pool.host2 = stomp2.your.net
    plugin.stomp.pool.port2 = 6163
    plugin.stomp.pool.user2 = you
    plugin.stomp.pool.password2 = secret
    plugin.stomp.pool.ssl2 = false

Using this method you can supply just STOMP_USER and STOMP_PASSWORD you have to supply the hostname for each pool member in the config. The port will default to 6163 if not specified.

In addition you can set the following options but only when using pooled configuration:

    plugin.stomp.pool.initial_reconnect_delay = 0.01
    plugin.stomp.pool.max_reconnect_delay = 30.0
    plugin.stomp.pool.use_exponential_back_off = true
    plugin.stomp.pool.back_off_multiplier = 2
    plugin.stomp.pool.max_reconnect_attempts = 0
    plugin.stomp.pool.randomize = false
    plugin.stomp.pool.timeout = -1

Methods

connect   disconnect   new   receive   send   subscribe   unsubscribe  

Attributes

connection  [R] 

Public Class methods

[Source]

    # File plugins/mcollective/connector/stomp.rb, line 57
57:             def initialize
58:                 @config = Config.instance
59:                 @subscriptions = []
60: 
61:                 @log = Log.instance
62:             end

Public Instance methods

Connects to the Stomp middleware

[Source]

     # File plugins/mcollective/connector/stomp.rb, line 65
 65:             def connect
 66:                 if @connection
 67:                     @log.debug("Already connection, not re-initializing connection")
 68:                     return
 69:                 end
 70: 
 71:                 begin
 72:                     host = nil
 73:                     port = nil
 74:                     user = nil
 75:                     password = nil
 76: 
 77:                     # Maintain backward compat for older stomps
 78:                     unless @config.pluginconf.include?("stomp.pool.size")
 79:                         host = get_env_or_option("STOMP_SERVER", "stomp.host")
 80:                         port = get_env_or_option("STOMP_PORT", "stomp.port", 6163).to_i
 81:                         user = get_env_or_option("STOMP_USER", "stomp.user")
 82:                         password = get_env_or_option("STOMP_PASSWORD", "stomp.password")
 83: 
 84:                         @log.debug("Connecting to #{host}:#{port}")
 85:                         @connection = ::Stomp::Connection.new(user, password, host, port, true)
 86:                     else
 87:                         pools = @config.pluginconf["stomp.pool.size"].to_i
 88:                         hosts = []
 89: 
 90:                         1.upto(pools) do |poolnum|
 91:                             host = {}
 92: 
 93:                             host[:host] = get_option("stomp.pool.host#{poolnum}")
 94:                             host[:port] = get_option("stomp.pool.port#{poolnum}", 6163).to_i
 95:                             host[:login] = get_env_or_option("STOMP_USER", "stomp.pool.user#{poolnum}")
 96:                             host[:passcode] = get_env_or_option("STOMP_PASSWORD", "stomp.pool.password#{poolnum}")
 97:                             host[:ssl] = get_bool_option("stomp.pool.ssl#{poolnum}", false)
 98: 
 99:                             @log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
100:                             hosts << host
101:                         end
102: 
103:                         raise "No hosts found for the STOMP connection pool" if hosts.size == 0
104: 
105:                         connection = {:hosts => hosts}
106: 
107:                         # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
108:                         # these can be guessed, the documentation isn't clear
109:                         connection[:initial_reconnect_delay] = get_option("stomp.pool.initial_reconnect_delay", 0.01).to_f
110:                         connection[:max_reconnect_delay] = get_option("stomp.pool.max_reconnect_delay", 30.0).to_f
111:                         connection[:use_exponential_back_off] = get_bool_option("stomp.pool.use_exponential_back_off", true)
112:                         connection[:back_off_multiplier] = get_bool_option("stomp.pool.back_off_multiplier", 2).to_i
113:                         connection[:max_reconnect_attempts] = get_option("stomp.pool.max_reconnect_attempts", 0)
114:                         connection[:randomize] = get_bool_option("stomp.pool.randomize", false)
115:                         connection[:backup] = get_bool_option("stomp.pool.backup", false)
116:                         connection[:timeout] = get_option("stomp.pool.timeout", -1).to_i
117: 
118:                         @connection = ::Stomp::Connection.new(connection)
119:                     end
120:                 rescue Exception => e
121:                     raise("Could not connect to Stomp Server: #{e}")
122:                 end
123:             end

Disconnects from the Stomp connection

[Source]

     # File plugins/mcollective/connector/stomp.rb, line 164
164:             def disconnect
165:                 @log.debug("Disconnecting from Stomp")
166:                 @connection.disconnect
167:             end

Receives a message from the Stomp connection

[Source]

     # File plugins/mcollective/connector/stomp.rb, line 126
126:             def receive
127:                 @log.debug("Waiting for a message from Stomp")
128:                 msg = @connection.receive
129: 
130:                 # STOMP puts the payload in the body variable, pass that
131:                 # into the payload of MCollective::Request and discard all the
132:                 # other headers etc that stomp provides
133:                 Request.new(msg.body)
134:             end

Sends a message to the Stomp connection

[Source]

     # File plugins/mcollective/connector/stomp.rb, line 137
137:             def send(target, msg)
138:                 @log.debug("Sending a message to Stomp target '#{target}'")
139:                 # deal with deprecation warnings in newer stomp gems
140:                 if @connection.respond_to?("publish")
141:                     @connection.publish(target, msg)
142:                 else
143:                     @connection.send(target, msg)
144:                 end
145:             end

Subscribe to a topic or queue

[Source]

     # File plugins/mcollective/connector/stomp.rb, line 148
148:             def subscribe(source)
149:                 unless @subscriptions.include?(source)
150:                     @log.debug("Subscribing to #{source}")
151:                     @connection.subscribe(source)
152:                     @subscriptions << source
153:                 end
154:             end

Subscribe to a topic or queue

[Source]

     # File plugins/mcollective/connector/stomp.rb, line 157
157:             def unsubscribe(source)
158:                 @log.debug("Unsubscribing from #{source}")
159:                 @connection.unsubscribe(source)
160:                 @subscriptions.delete(source)
161:             end

[Validate]