class Mongo::Cluster
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
@since 2.0.0
Constants
- IDLE_WRITE_PERIOD_SECONDS
How often an idle primary writes a no-op to the oplog.
@since 2.4.0
- MAX_READ_RETRIES
The default number of mongos read retries.
@since 2.1.1
- MAX_WRITE_RETRIES
The default number of mongos write retries.
@since 2.4.2
- READ_RETRY_INTERVAL
The default mongos read retry interval, in seconds.
@since 2.1.1
Attributes
@return [ Mongo::Cluster::AppMetadata ] The application metadata, used for connection
handshakes.
@since 2.4.0
@return [ Monitoring ] monitoring The monitoring.
@return [ Hash ] The options hash.
@return [ Object ] The cluster topology.
Public Class Methods
Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.
@api private
@example Create a cluster for the client.
Cluster.create(client)
@param [ Client ] client The client to create on.
@return [ Cluster ] The cluster.
@since 2.0.0
# File lib/mongo/cluster.rb, line 424 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), client.instance_variable_get(:@monitoring).dup, client.options ) client.instance_variable_set(:@cluster, cluster) end
Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.
@example Finalize the cluster.
Cluster.finalize(pools)
@param [ Hash<Address, Server::ConnectionPool> ] pools The connection
pools.
@return [ Proc ] The Finalizer.
@since 2.2.0
# File lib/mongo/cluster.rb, line 197 def self.finalize(pools, cursor_reaper) proc do begin; cursor_reaper.kill_cursors; rescue; end cursor_reaper.stop! pools.values.each do |pool| pool.disconnect! end end end
Instantiate the new cluster.
@api private
@example Instantiate the cluster.
Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)
@note Cluster should never be directly instantiated outside of a Client.
@param [ Array<String> ] seeds The addresses of the configured servers. @param [ Monitoring ] monitoring The monitoring. @param [ Hash ] options The options.
@since 2.0.0
# File lib/mongo/cluster.rb, line 152 def initialize(seeds, monitoring, options = Options::Redacted.new) @addresses = [] @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = options.freeze @app_metadata ||= AppMetadata.new(self) @update_lock = Mutex.new @pool_lock = Mutex.new @topology = Topology.initial(seeds, monitoring, options) publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(@topology) ) subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self)) seeds.each{ |seed| add(seed) } publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(@topology, @topology) ) if @servers.size > 1 @cursor_reaper = CursorReaper.new @cursor_reaper.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools, @cursor_reaper)) end
Public Instance Methods
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
@example Is the cluster equal to the object?
cluster == other
@param [ Object ] other The object to compare to.
@return [ true, false ] If the objects are equal.
@since 2.0.0
# File lib/mongo/cluster.rb, line 81 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && options == other.options end
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
@example Add the server for the address to the cluster.
cluster.add('127.0.0.1:27018')
@param [ String ] host The address of the server to add.
@return [ Server ] The newly added server, if not present already.
@since 2.0.0
# File lib/mongo/cluster.rb, line 98 def add(host) address = Address.new(host, options) if !addresses.include?(address) if addition_allowed?(address) @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, options) @update_lock.synchronize { @servers.push(server) } server end end end
Add hosts in a description to the cluster.
@example Add hosts in a description to the cluster.
cluster.add_hosts(description)
@param [ Mongo::Server::Description ] description The description.
@since 2.0.6
# File lib/mongo/cluster.rb, line 389 def add_hosts(description) if topology.add_hosts?(description, servers_list) description.servers.each { |s| add(s) } end end
The addresses in the cluster.
@example Get the addresses in the cluster.
cluster.addresses
@return [ Array<Mongo::Address> ] The addresses.
@since 2.0.6
# File lib/mongo/cluster.rb, line 441 def addresses addresses_list end
Disconnect all servers.
@example Disconnect the cluster's servers.
cluster.disconnect!
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/cluster.rb, line 361 def disconnect! begin; @cursor_reaper.kill_cursors; rescue; end @cursor_reaper.stop! @servers.each { |server| server.disconnect! } and true end
Elect a primary server from the description that has just changed to a primary.
@example Elect a primary server.
cluster.elect_primary!(description)
@param [ Server::Description ] description The newly elected primary.
@return [ Topology ] The cluster topology.
@since 2.0.0
# File lib/mongo/cluster.rb, line 245 def elect_primary!(description) @topology = topology.elect_primary(description, servers_list) end
Determine if the cluster would select a readable server for the provided read preference.
@example Is a readable server present?
topology.has_readable_server?(server_selector)
@param [ ServerSelector ] server_selector The server
selector.
@return [ true, false ] If a readable server is present.
@since 2.4.0
# File lib/mongo/cluster.rb, line 122 def has_readable_server?(server_selector = nil) topology.has_readable_server?(self, server_selector) end
Determine if the cluster would select a writable server.
@example Is a writable server present?
topology.has_writable_server?
@return [ true, false ] If a writable server is present.
@since 2.4.0
# File lib/mongo/cluster.rb, line 134 def has_writable_server? topology.has_writable_server?(self) end
Get the nicer formatted string for use in inspection.
@example Inspect the cluster.
cluster.inspect
@return [ String ] The cluster inspection.
@since 2.0.0
# File lib/mongo/cluster.rb, line 215 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>" end
Get the maximum number of times the cluster can retry a read operation on a mongos.
@example Get the max read retries.
cluster.max_read_retries
@return [ Integer ] The maximum retries.
@since 2.1.1
# File lib/mongo/cluster.rb, line 258 def max_read_retries options[:max_read_retries] || MAX_READ_RETRIES end
Get the next primary server we can send an operation to.
@example Get the next primary server.
cluster.next_primary
@param [ true, false ] ping Whether to ping the server before selection.
@return [ Mongo::Server ] A primary server.
@since 2.0.0
# File lib/mongo/cluster.rb, line 229 def next_primary(ping = true) @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) @primary_selector.select_server(self, ping) end
Get the scoped connection pool for the server.
@example Get the connection pool.
cluster.pool(server)
@param [ Server ] server The server.
@return [ Server::ConnectionPool ] The connection pool.
@since 2.2.0
# File lib/mongo/cluster.rb, line 272 def pool(server) @pool_lock.synchronize do pools[server.address] ||= Server::ConnectionPool.get(server) end end
Get the interval, in seconds, in which a mongos read operation is retried.
@example Get the read retry interval.
cluster.read_retry_interval
@return [ Float ] The interval.
@since 2.1.1
# File lib/mongo/cluster.rb, line 287 def read_retry_interval options[:read_retry_interval] || READ_RETRY_INTERVAL end
Reconnect all servers.
@example Reconnect the cluster's servers.
cluster.reconnect!
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/cluster.rb, line 375 def reconnect! scan! servers.each { |server| server.reconnect! } @cursor_reaper.restart! and true end
Remove the server from the cluster for the provided address, if it exists.
@example Remove the server from the cluster.
server.remove('127.0.0.1:27017')
@param [ String ] host The host/port or socket address.
@since 2.0.0
# File lib/mongo/cluster.rb, line 313 def remove(host) address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(address, topology) ) @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end
Remove hosts in a description from the cluster.
@example Remove hosts in a description from the cluster.
cluster.remove_hosts(description)
@param [ Mongo::Server::Description ] description The description.
@since 2.0.6
# File lib/mongo/cluster.rb, line 403 def remove_hosts(description) if topology.remove_hosts?(description) servers_list.each do |s| remove(s.address.to_s) if topology.remove_server?(description, s) end end end
Force a scan of all known servers in the cluster.
@example Force a full cluster scan.
cluster.scan!
@note This operation is done synchronously. If servers in the cluster are
down or slow to respond this can potentially be a slow operation.
@return [ true ] Always true.
@since 2.0.0
# File lib/mongo/cluster.rb, line 336 def scan! servers_list.each{ |server| server.scan! } and true end
Get a list of server candidates from the cluster that can have operations executed on them.
@example Get the server candidates for an operation.
cluster.servers
@return [ Array<Server> ] The candidate servers.
@since 2.0.0
# File lib/mongo/cluster.rb, line 349 def servers topology.servers(servers_list.compact).compact end
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
@example Notify the cluster that a standalone server was discovered.
cluster.standalone_discovered
@return [ Topology ] The cluster topology.
@since 2.0.6
# File lib/mongo/cluster.rb, line 300 def standalone_discovered @topology = topology.standalone_discovered end
Private Instance Methods
# File lib/mongo/cluster.rb, line 451 def addition_allowed?(address) !@topology.single? || direct_connection?(address) end
# File lib/mongo/cluster.rb, line 467 def addresses_list @update_lock.synchronize do @addresses.reduce([]) do |addresses, address| addresses << address end end end
# File lib/mongo/cluster.rb, line 447 def direct_connection?(address) address.seed == @topology.seed end
# File lib/mongo/cluster.rb, line 455 def pools @pools ||= {} end
# File lib/mongo/cluster.rb, line 459 def servers_list @update_lock.synchronize do @servers.reduce([]) do |servers, server| servers << server end end end