Parent

Files

Stomp::Connection

Low level connection which maps commands and supports synchronous receives

Attributes

connection_frame[R]
disconnect_receipt[R]

Public Class Methods

default_port(ssl) click to toggle source

alias :obj_send :send

# File lib/stomp/connection.rb, line 14
def self.default_port(ssl)
  ssl ? 61612 : 61613
end
new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

A new Connection object accepts the following parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Connection.new("username", "password", "localhost", 61613, true)

Hash:

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :backup => false,
  :timeout => -1,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
}

e.g. c = Connection.new(hash)

TODO Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://user:pass@host:port
stomp://user:pass@host.domain.tld:port
# File lib/stomp/connection.rb, line 60
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @received_messages = []

  if login.is_a?(Hash)
    hashed_initialize(login)
  else
    @host = host
    @port = port
    @login = login
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parameters = nil
    @parse_timeout = 5              # To override, use hashed parameters
    @logger = nil                   # To override, use hashed parameters
  end
  
  # Use Mutexes:  only one lock per each thread
  # Revert to original implementation attempt
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new
  
  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0
  
  socket
end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

Syntactic sugar for 'Connection.new' See 'initialize' for usage.

# File lib/stomp/connection.rb, line 108
def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
end

Public Instance Methods

__old_receive() click to toggle source

Receive a frame, block until the frame is received

# File lib/stomp/connection.rb, line 340
def __old_receive
  # The recive my fail so we may need to retry.
  while TRUE
    begin
      used_socket = socket
      return _receive(used_socket)
    rescue
      @failure = $!
      raise unless @reliable
      errstr = "receive failed: #{$!}"
      if @logger && @logger.respond_to?(:on_miscerr)
        @logger.on_miscerr(log_params, errstr)
      else
        $stderr.print errstr
      end
    end
  end
end
abort(name, headers = {}) click to toggle source

Abort a transaction by name

# File lib/stomp/connection.rb, line 234
def abort(name, headers = {})
  headers[:transaction] = name
  transmit("ABORT", headers)
end
ack(message_id, headers = {}) click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/connection.rb, line 222
def ack(message_id, headers = {})
  headers['message-id'] = message_id
  transmit("ACK", headers)
end
begin(name, headers = {}) click to toggle source

Begin a transaction, requires a name for the transaction

# File lib/stomp/connection.rb, line 213
def begin(name, headers = {})
  headers[:transaction] = name
  transmit("BEGIN", headers)
end
change_host() click to toggle source
# File lib/stomp/connection.rb, line 175
def change_host
  @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
  
  # Set first as master and send it to the end of array
  current_host = @parameters[:hosts].shift
  @parameters[:hosts] << current_host
  
  @ssl = current_host[:ssl]
  @host = current_host[:host]
  @port = current_host[:port] || Connection::default_port(@ssl)
  @login = current_host[:login] || ""
  @passcode = current_host[:passcode] || ""
  
end
client_ack?(message) click to toggle source
# File lib/stomp/connection.rb, line 314
def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end
closed?() click to toggle source

Is this connection closed?

# File lib/stomp/connection.rb, line 208
def closed?
  @closed
end
commit(name, headers = {}) click to toggle source

Commit a transaction by name

# File lib/stomp/connection.rb, line 228
def commit(name, headers = {})
  headers[:transaction] = name
  transmit("COMMIT", headers)
end
disconnect(headers = {}) click to toggle source

Close this connection

# File lib/stomp/connection.rb, line 320
def disconnect(headers = {})
  transmit("DISCONNECT", headers)
  headers = headers.symbolize_keys
  @disconnect_receipt = receive if headers[:receipt]
  if @logger && @logger.respond_to?(:on_disconnect)
    @logger.on_disconnect(log_params)
  end
  close_socket
end
hashed_initialize(params) click to toggle source
# File lib/stomp/connection.rb, line 92
def hashed_initialize(params)
  
  @parameters = refine_params(params)
  @reliable = true
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @logger =  @parameters[:logger]
  #sets the first host to connect
  change_host
  if @logger && @logger.respond_to?(:on_connecting)            
    @logger.on_connecting(log_params)
  end
end
increase_reconnect_delay() click to toggle source
# File lib/stomp/connection.rb, line 194
def increase_reconnect_delay

  @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] 
  @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
  
  @reconnect_delay
end
max_reconnect_attempts?() click to toggle source
# File lib/stomp/connection.rb, line 190
def max_reconnect_attempts?
  !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
end
obj_send(*args) click to toggle source
# File lib/stomp/connection.rb, line 270
def obj_send(*args)
  __send__(*args)
end
open?() click to toggle source

Is this connection open?

# File lib/stomp/connection.rb, line 203
def open?
  !@closed
end
poll() click to toggle source

Return a pending message if one is available, otherwise return nil

# File lib/stomp/connection.rb, line 332
def poll
  # No need for a read lock here.  The receive method eventually fullfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive
end
publish(destination, message, headers = {}) click to toggle source

Publish message to destination

To disable content length header ( :suppress_content_length => true ) Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/connection.rb, line 265
def publish(destination, message, headers = {})
  headers[:destination] = destination
  transmit("SEND", headers, message)
end
receive() click to toggle source
# File lib/stomp/connection.rb, line 359
def receive
  super_result = __old_receive
  if super_result.nil? && @reliable
    errstr = "connection.receive returning EOF as nil - resetting connection.\n"
    if @logger && @logger.respond_to?(:on_miscerr)
      @logger.on_miscerr(log_params, errstr)
    else
      $stderr.print errstr
    end
    @socket = nil
    super_result = __old_receive
  end
  return super_result
end
refine_params(params) click to toggle source
# File lib/stomp/connection.rb, line 153
def refine_params(params)
  params = params.uncamelize_and_symbolize_keys
  
  default_params = {
    :connect_headers => {},
    # Failover parameters
    :initial_reconnect_delay => 0.01,
    :max_reconnect_delay => 30.0,
    :use_exponential_back_off => true,
    :back_off_multiplier => 2,
    :max_reconnect_attempts => 0,
    :randomize => false,
    :backup => false,
    :timeout => -1,
    # Parse Timeout
    :parse_timeout => 5
  }
  
  default_params.merge(params)
    
end
send(*args) click to toggle source
# File lib/stomp/connection.rb, line 274
def send(*args)
  warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
  publish(*args)
end
socket() click to toggle source
# File lib/stomp/connection.rb, line 112
def socket
  @socket_semaphore.synchronize do
    used_socket = @socket
    used_socket = nil if closed?
    
    while used_socket.nil? || !@failure.nil?
      @failure = nil
      begin
        used_socket = open_socket
        # Open complete
        
        connect(used_socket)
        if @logger && @logger.respond_to?(:on_connected)
          @logger.on_connected(log_params) 
        end
        @connection_attempts = 0
      rescue
        @failure = $!
        used_socket = nil
        raise unless @reliable
        if @logger && @logger.respond_to?(:on_connectfail)            
          @logger.on_connectfail(log_params) 
        else
          $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
        end
        raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?

        sleep(@reconnect_delay)
        
        @connection_attempts += 1
        
        if @parameters
          change_host
          increase_reconnect_delay
        end
      end
    end
    @socket = used_socket
  end
end
subscribe(name, headers = {}, subId = nil) click to toggle source

Subscribe to a destination, must specify a name

# File lib/stomp/connection.rb, line 240
def subscribe(name, headers = {}, subId = nil)
  headers[:destination] = name
  transmit("SUBSCRIBE", headers)

  # Store the sub so that we can replay if we reconnect.
  if @reliable
    subId = name if subId.nil?
    @subscriptions[subId] = headers
  end
end
unreceive(message, options = {}) click to toggle source

Send a message back to the source or to the dead letter queue

Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" ) Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ) Accepts a force client acknowledgement option (:force_client_ack => true)

# File lib/stomp/connection.rb, line 284
def unreceive(message, options = {})
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys
  
  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')
  
  begin
    self.begin transaction_id
    
    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end
    
    if retry_count <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end
unsubscribe(name, headers = {}, subId = nil) click to toggle source

Unsubscribe from a destination, must specify a name

# File lib/stomp/connection.rb, line 252
def unsubscribe(name, headers = {}, subId = nil)
  headers[:destination] = name
  transmit("UNSUBSCRIBE", headers)
  if @reliable
    subId = name if subId.nil?
    @subscriptions.delete(subId)
  end
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.