Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.
Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.
A new Client object can be initialized using two forms:
Standard positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 37 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) # Parse stomp:// URL's or set params if login.is_a?(Hash) @parameters = login first_host = @parameters[:hosts][0] @login = first_host[:login] @passcode = first_host[:passcode] @host = first_host[:host] @port = first_host[:port] || Connection::default_port(first_host[:ssl]) @reliable = true elsif login =~ %r^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port @login = $2 || "" @passcode = $3 || "" @host = $4 @port = $5.to_i @reliable = false elsif login =~ %r^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param first_host = {} first_host[:ssl] = !$2.nil? @login = first_host[:login] = $4 || "" @passcode = first_host[:passcode] = $5 || "" @host = first_host[:host] = $6 @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl]) options = $16 || "" parts = options.split(%r&|=/) options = Hash[*parts] hosts = [first_host] + parse_hosts(login) @parameters = {} @parameters[:hosts] = hosts @parameters.merge! filter_options(options) @reliable = true else @login = login @passcode = passcode @host = host @port = port.to_i @reliable = reliable end check_arguments! @id_mutex = Mutex.new @ids = 1 if @parameters @connection = Connection.new(@parameters) else @connection = Connection.new(@login, @passcode, @host, @port, @reliable) end start_listeners end
Syntactic sugar for '::new' See 'initialize' for usage.
# File lib/stomp/client.rb, line 103 def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) Client.new(login, passcode, host, port, reliable) end
Abort a transaction by name
# File lib/stomp/client.rb, line 119 def abort(name, headers = {}) @connection.abort(name, headers) # lets replay any ack'd messages in this transaction replay_list = @replay_messages_by_txn[name] if replay_list replay_list.each do |message| if listener = find_listener(message) listener.call(message) end end end end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 168 def acknowledge(message, headers = {}) txn_id = headers[:transaction] if txn_id # lets keep around messages ack'd in this transaction in case we rollback replay_list = @replay_messages_by_txn[txn_id] if replay_list.nil? replay_list = [] @replay_messages_by_txn[txn_id] = replay_list end replay_list << message end if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.ack message.headers['message-id'], headers end
Begin a transaction by name
# File lib/stomp/client.rb, line 114 def begin(name, headers = {}) @connection.begin(name, headers) end
Close out resources in use by this client
# File lib/stomp/client.rb, line 232 def close headers={} @listener_thread.exit @connection.disconnect headers end
Is this client closed?
# File lib/stomp/client.rb, line 227 def closed? @connection.closed? end
Commit a transaction by name
# File lib/stomp/client.rb, line 134 def commit(name, headers = {}) txn_id = headers[:transaction] @replay_messages_by_txn.delete(txn_id) @connection.commit(name, headers) end
# File lib/stomp/client.rb, line 213 def connection_frame @connection.connection_frame end
# File lib/stomp/client.rb, line 217 def disconnect_receipt @connection.disconnect_receipt end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp/client.rb, line 109 def join(limit = nil) @listener_thread.join(limit) end
Stomp 1.1+ NACK
# File lib/stomp/client.rb, line 186 def nack(message_id, headers = {}) @connection.nack message_id, headers end
# File lib/stomp/client.rb, line 209 def obj_send(*args) __send__(*args) end
Is this client open?
# File lib/stomp/client.rb, line 222 def open? @connection.open? end
Convenience method
# File lib/stomp/client.rb, line 248 def protocol() @connection.protocol end
Publishes message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 202 def publish(destination, message, headers = {}) if block_given? headers['receipt'] = register_receipt_listener lambda {|r| yield r} end @connection.publish(destination, message, headers) end
Check if the thread was created and isn't dead
# File lib/stomp/client.rb, line 238 def running @listener_thread && !!@listener_thread.status end
Convenience method
# File lib/stomp/client.rb, line 243 def set_logger(logger) @connection.set_logger(logger) end
Convenience method for clients
# File lib/stomp/client.rb, line 258 def sha1(data) @connection.sha1(data) end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/client.rb, line 144 def subscribe(destination, headers = {}) raise "No listener given" unless block_given? # use subscription id to correlate messages to subscription. As described in # the SUBSCRIPTION section of the protocol: http://stomp.github.com/. # If no subscription id is provided, generate one. set_subscription_id_if_missing(destination, headers) if @listeners[headers[:id]] raise "attempting to subscribe to a queue with a previous subscription" end @listeners[headers[:id]] = lambda {|msg| yield msg} @connection.subscribe(destination, headers) end
Unreceive a message, sending it back to its queue or to the DLQ
# File lib/stomp/client.rb, line 192 def unreceive(message, options = {}) @connection.unreceive(message, options) end
Unsubecribe from a channel
# File lib/stomp/client.rb, line 158 def unsubscribe(name, headers = {}) set_subscription_id_if_missing(name, headers) @connection.unsubscribe(name, headers) @listeners[headers[:id]] = nil end
Convenience method for clients
# File lib/stomp/client.rb, line 263 def uuid() @connection.uuid() end
Convenience method
# File lib/stomp/client.rb, line 253 def valid_utf8?(s) @connection.valid_utf8?(s) end