class Qpid::Proton::Reactor::Container

A representation of the AMQP concept of a container which, loosely speaking, is something that establishes links to or from another container on which messages are transferred.

This is an extension to the Reactor classthat adds convenience methods for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender and Qpid::Proton::Receiver.

@example

Attributes

container_id[RW]
global_handler[RW]

Public Class Methods

new(handlers, options = {}) click to toggle source
Calls superclass method
# File lib/reactor/container.rb, line 58
def initialize(handlers, options = {})
  super(handlers, options)

  # only do the following if we're creating a new instance
  if !options.has_key?(:impl)
    @ssl = SSLConfig.new
    if options[:global_handler]
      self.global_handler = GlobalOverrides.new(options[:global_handler])
    else
      # very ugly, but using self.global_handler doesn't work in the constructor
      ghandler = Reactor.instance_method(:global_handler).bind(self).call
      ghandler = GlobalOverrides.new(ghandler)
      Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
    end
    @trigger = nil
    @container_id = generate_uuid
  end
end

Public Instance Methods

_session(context) click to toggle source
# File lib/reactor/container.rb, line 110
def _session(context)
  if context.is_a?(Qpid::Proton::URL)
    return self._session(self.connect(:url => context))
  elsif context.is_a?(Qpid::Proton::Session)
    return context
  elsif context.is_a?(Qpid::Proton::Connection)
    if context.session_policy?
      return context.session_policy.session(context)
    else
      return self.create_session(context)
    end
  else
    return context.session
  end
end
connect(options = {}) click to toggle source

Initiates the establishment of an AMQP connection.

@param options [Hash] A hash of named arguments.

# File lib/reactor/container.rb, line 81
def connect(options = {})
  conn = self.connection(options[:handler])
  conn.container = self.container_id || generate_uuid
  connector = Connector.new(conn)
  conn.overrides = connector
  if !options[:url].nil?
    connector.address = URLs.new([options[:url]])
  elsif !options[:urls].nil?
    connector.address = URLs.new(options[:urls])
  elsif !options[:address].nil?
    connector.address = URLs.new([Qpid::Proton::URL.new(options[:address])])
  else
    raise ::ArgumentError.new("either :url or :urls or :address required")
  end

  connector.heartbeat = options[:heartbeat] if !options[:heartbeat].nil?
  if !options[:reconnect].nil?
    connector.reconnect = options[:reconnect]
  else
    connector.reconnect = Backoff.new()
  end

  connector.ssl_domain = SessionPerConnection.new # TODO seems this should be configurable

  conn.open

  return conn
end
create_receiver(context, opts = {}) click to toggle source

Initiates the establishment of a link over which messages can be received.

There are two accepted arguments for the context

1. If a Connection is supplied then the link is established using that

object. The source, and optionally the target, address can be supplied

2. If it is a String or a URL then a new Connection is created on which

the link will be attached. If a path is specified, but not the source address, then the path of the URL is used as the target address.

The name will be generated for the link if one is not specified.

@param context [Connection, URL, String] The connection or the address. @param opts [Hash] Additional otpions. @option opts [String, Qpid::Proton::URL] The source address. @option opts [String] :target The target address @option opts [String] :name The link name. @option opts [Boolean] :dynamic @option opts [Object] :handler @option opts [Hash] :options Additional link options.

@return [Receiver

# File lib/reactor/container.rb, line 185
def create_receiver(context, opts = {})
  if context.is_a?(::String)
    context = Qpid::Proton::URL.new(context)
  end

  source = opts[:source]
  if context.is_a?(Qpid::Proton::URL) && source.nil?
    source = context.path
  end

  session = self._session(context)

  receiver = session.receiver(opts[:name] ||
                              id(session.connection.container,
                                  source, opts[:target]))
  receiver.source.address = source if source
  receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
  receiver.target.address = opts[:target] if !opts[:target].nil?
  receiver.handler = opts[:handler] if !opts[:handler].nil?
  self._apply_link_options(opts[:options], receiver)
  receiver.open
  return receiver
end
create_sender(context, opts = {}) click to toggle source

Initiates the establishment of a link over which messages can be sent.

@param context [String, URL] The context. @param opts [Hash] Additional options. @param opts [String, Qpid::Proton::URL] The target address. @param opts [String] :source The source address. @param opts [Boolean] :dynamic @param opts [Object] :handler @param opts [Object] :tag_generator The tag generator. @param opts [Hash] :options Addtional link options

@return [Sender] The sender.

# File lib/reactor/container.rb, line 139
def create_sender(context, opts = {})
  if context.is_a?(::String)
    context = Qpid::Proton::URL.new(context)
  end

  target = opts[:target]
  if context.is_a?(Qpid::Proton::URL) && target.nil?
    target = context.path
  end

  session = self._session(context)

  sender = session.sender(opts[:name] ||
                          id(session.connection.container,
                            target, opts[:source]))
    sender.source.address = opts[:source] if !opts[:source].nil?
    sender.target.address = target if target
    sender.handler = opts[:handler] if !opts[:handler].nil?
    sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
    self._apply_link_options(opts[:options], sender)
    sender.open
    return sender
end
declare_transaction(context, handler = nil, settle_before_discharge = false) click to toggle source
# File lib/reactor/container.rb, line 209
def declare_transaction(context, handler = nil, settle_before_discharge = false)
  if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
    class << context
      attr_accessor :txn_ctl
    end
    context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
    InternalTransactionHandler.new())
  end
  return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
end
do_work(timeout = nil) click to toggle source
# File lib/reactor/container.rb, line 239
def do_work(timeout = nil)
  self.timeout = timeout unless timeout.nil?
  self.process
end
id(container, remote, local) click to toggle source
# File lib/reactor/container.rb, line 244
def id(container, remote, local)
  if !local.nil? && !remote.nil?
    "#{container}-#{remote}-#{local}"
  elsif !local.nil?
    "#{container}-#{local}"
  elsif !remote.nil?
    "#{container}-#{remote}"
  else
    "#{container}-#{generate_uuid}"
  end
end
listen(url, ssl_domain = nil) click to toggle source

Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.

@param url [] @param ssl_domain []

# File lib/reactor/container.rb, line 226
def listen(url, ssl_domain = nil)
  url = Qpid::Proton::URL.new(url)
  acceptor = self.acceptor(url.host, url.port)
  ssl_config = ssl_domain
  if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
    ssl_config = @ssl.server
  end
  if !ssl_config.nil?
    acceptor.ssl_domain(ssl_config)
  end
  return acceptor
end
to_s() click to toggle source
# File lib/reactor/container.rb, line 266
def to_s
  "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
end