class Qpid::Proton::Connection
An AMQP connection.
Constants
- PROTON_METHOD_PREFIX
Attributes
@return [Container] the container managing this connection
@return [WorkQueue] work queue to execute code serialized correctly for this connection
Public Class Methods
@private
# File lib/core/connection.rb, line 34 def initialize(impl = Cproton.pn_connection) super() @impl = impl @overrides = nil @session_policy = nil @link_count = 0 @link_prefix = "" self.class.store_instance(self, :pn_connection_attachments) end
@private
# File lib/core/connection.rb, line 28 def self.wrap(impl) return nil if impl.nil? self.fetch_instance(impl, :pn_connection_attachments) || Connection.new(impl) end
Public Instance Methods
@private
# File lib/core/connection.rb, line 128 def apply opts # NOTE: Only connection options are set here. # Transport options must be applied with {Transport#apply} @container = opts[:container] cid = opts[:container_id] || (@container && @container.id) || SecureRandom.uuid cid = cid.to_s if cid.is_a? Symbol # Allow symbols as container name Cproton.pn_connection_set_container(@impl, cid) Cproton.pn_connection_set_user(@impl, opts[:user]) if opts[:user] Cproton.pn_connection_set_password(@impl, opts[:password]) if opts[:password] Cproton.pn_connection_set_hostname(@impl, opts[:virtual_host]) if opts[:virtual_host] @link_prefix = opts[:link_prefix] || cid Codec::Data.from_object(Cproton.pn_connection_offered_capabilities(@impl), opts[:offered_capabilities]) Codec::Data.from_object(Cproton.pn_connection_desired_capabilities(@impl), opts[:desired_capabilities]) Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties]) end
Closes the local end of the connection. The remote end may or may not be closed. @param error [Condition] Optional error condition to send with the close.
# File lib/core/connection.rb, line 174 def close(error=nil) Condition.assign(_local_condition, error) Cproton.pn_connection_close(@impl) end
@return [Connection] self
# File lib/core/connection.rb, line 65 def connection() self; end
@return AMQP container ID advertised by the remote peer. To get the local container ID use {#container} and {Container#id}
# File lib/core/connection.rb, line 72 def container_id() Cproton.pn_connection_remote_container(@impl); end
Returns the default session for this connection.
@return [Session] The session.
# File lib/core/connection.rb, line 196 def default_session @session ||= open_session end
@return [Array<Symbol>] desired capabilities provided by the remote peer
# File lib/core/connection.rb, line 86 def desired_capabilities # Provide capabilities consistently as an array, even if encoded as a single symbol Codec::Data.to_multiple(Cproton.pn_connection_remote_desired_capabilities(@impl)) end
Get the links on this connection. @overload #each_link
@yieldparam l [Link] pass each link to block
@overload #each_link
@return [Enumerator] enumerator over links
# File lib/core/connection.rb, line 250 def each_link return enum_for(:each_link) unless block_given? l = Cproton.pn_link_head(@impl, 0); while l l2 = l # get next before yield, in case yield closes l and unlinks it l = Cproton.pn_link_next(l, 0) yield Link.wrap(l2) end self end
Get the {Receiver} links - see {#each_link}
# File lib/core/connection.rb, line 268 def each_receiver() return enum_for(:each_receiver) unless block_given? each_link.select { |l| yield l if l.receiver? } end
Get the {Sender} links - see {#each_link}
# File lib/core/connection.rb, line 262 def each_sender() return enum_for(:each_sender) unless block_given? each_link.select { |l| yield l if l.sender? } end
Get the sessions on this connection. @overload #each_session
@yieldparam s [Session] pass each session to block
@overload #each_session
@return [Enumerator] enumerator over sessions
# File lib/core/connection.rb, line 229 def each_session(&block) return enum_for(:each_session) unless block_given? s = Cproton.pn_session_head(@impl, 0); while s yield Session.wrap(s) s = Cproton.pn_session_next(s, 0) end self end
@deprecated use {#condition}
# File lib/core/connection.rb, line 280 def error deprecated __method__, "#condition" Cproton.pn_error_code(Cproton.pn_connection_error(@impl)) end
Idle-timeout advertised by the remote peer, in seconds. @return [Numeric] Idle-timeout advertised by the remote peer, in seconds. @return [nil] if the peer does not advertise an idle time-out
# File lib/core/connection.rb, line 147 def idle_timeout() if transport && (t = transport.remote_idle_timeout) Rational(t, 1000) # More precise than Float end end
@deprecated use {#each_link}
# File lib/core/connection.rb, line 240 def link_head(mask) deprecated __method__, "#each_link" Link.wrap(Cproton.pn_link_head(@impl, mask)) end
@private Generate a unique link name, internal use only.
# File lib/core/connection.rb, line 286 def link_name() @link_prefix + "/" + (@link_count += 1).to_s(32) end
Maximum frame size, in bytes, advertised by the remote peer. See {Connection#open :max_frame_size} @return [Integer] maximum frame size @return [nil] no limit
# File lib/core/connection.rb, line 166 def max_frame_size() raise StateError, "connection not bound to transport" unless transport max = transport.remote_max_frame return max.zero? ? nil : max end
Session limit advertised by the remote peer. See {Connection#open :max_sessions} @return [Integer] maximum number of sessions per connection allowed by remote peer. @return [nil] no specific limit is set.
# File lib/core/connection.rb, line 156 def max_sessions() raise StateError, "connection not bound to transport" unless transport max = transport.remote_channel_max return max.zero? ? nil : max end
@return [Array<Symbol>] offered capabilities provided by the remote peer
# File lib/core/connection.rb, line 79 def offered_capabilities # Provide capabilities consistently as an array, even if encoded as a single symbol Codec::Data.to_multiple(Cproton.pn_connection_remote_offered_capabilities(@impl)) end
Open the local end of the connection.
@option opts [MessagingHandler] :handler handler for events related to this connection.
@option opts [String] :user User name for authentication @option opts [String] :password Authentication secret @option opts [String] :virtual_host Virtual host name @option opts [String] :container_id (provided by {Container}) override advertised container-id
@option opts [Hash<Symbol=>Object>] :properties Application-defined properties @option opts [Array<Symbol>] :offered_capabilities Extensions the endpoint supports @option opts [Array<Symbol>] :desired_capabilities Extensions the endpoint can use
@option opts [Numeric] :idle_timeout Seconds before closing an idle connection @option opts [Integer] :max_sessions Limit the number of active sessions @option opts [Integer] :max_frame_size Limit the size of AMQP frames
@option opts [Boolean] :sasl_enabled (false) Enable or disable SASL. @option opts [Boolean] :sasl_allow_insecure_mechs (false) Allow mechanisms send secrets in clear text @option opts [String] :sasl_allowed_mechs SASL mechanisms allowed by this end of the connection
@option opts [SSLDomain] :ssl_domain SSL configuration domain.
# File lib/core/connection.rb, line 121 def open(opts=nil) return if local_active? apply opts if opts Cproton.pn_connection_open(@impl) end
Open a on the #default_session @option opts (see Qpid::Proton::Session#open_receiver)
# File lib/core/connection.rb, line 216 def open_receiver(opts=nil) default_session.open_receiver(opts) end
Open a sender on the #default_session @option opts (see Qpid::Proton::Session#open_sender)
# File lib/core/connection.rb, line 212 def open_sender(opts=nil) default_session.open_sender(opts) end
Open a new session on this connection.
# File lib/core/connection.rb, line 204 def open_session s = Session.wrap(Cproton.pn_session(@impl)) s.open return s end
@deprecated no replacement
# File lib/core/connection.rb, line 59 def overrides?() deprecated __method__; false; end
@return [Hash] connection-properties provided by the remote peer
# File lib/core/connection.rb, line 93 def properties Codec::Data.to_object(Cproton.pn_connection_remote_properties(@impl)) end
@deprecated use {#each_session}
# File lib/core/connection.rb, line 219 def session_head(mask) deprecated __method__, "#each_session" Session.wrap(Cproton.pn_session_head(@impl, mask)) end
@deprecated no replacement
# File lib/core/connection.rb, line 62 def session_policy?() deprecated __method__; false; end
Gets the endpoint current state flags
@see Endpoint#LOCAL_UNINIT @see Endpoint#LOCAL_ACTIVE @see Endpoint#LOCAL_CLOSED @see Endpoint#LOCAL_MASK
@return [Integer] The state flags.
# File lib/core/connection.rb, line 188 def state Cproton.pn_connection_state(@impl) end
@return [Transport, nil] transport bound to this connection, or nil if unbound.
# File lib/core/connection.rb, line 68 def transport() Transport.wrap(Cproton.pn_connection_transport(@impl)); end
@return [String] User name used for authentication (outgoing connection) or the authenticated user name (incoming connection)
# File lib/core/connection.rb, line 54 def user() Cproton.pn_connection_get_user(impl) or (connection.transport && connection.transport.user) end
@return [String] The AMQP hostname for the connection.
# File lib/core/connection.rb, line 45 def virtual_host() Cproton.pn_connection_remote_hostname(@impl); end
@deprecated use {#MessagingHandler} to handle work
# File lib/core/connection.rb, line 274 def work_head deprecated __method__ Delivery.wrap(Cproton.pn_work_head(@impl)) end
Protected Instance Methods
# File lib/core/connection.rb, line 295 def _local_condition Cproton.pn_connection_condition(@impl) end
# File lib/core/connection.rb, line 299 def _remote_condition Cproton.pn_connection_remote_condition(@impl) end