module Mongo::ServerSelector::Selectable

Provides common behavior for filtering a list of servers by server mode or tag set.

@since 2.0.0

Attributes

max_staleness[R]

@return [ Integer ] max_staleness The maximum replication lag, in seconds, that a

secondary can suffer and still be eligible for a read.

@since 2.4.0

options[R]

@return [ Hash ] options The options.

tag_sets[R]

@return [ Array ] tag_sets The tag sets used to select servers.

Public Class Methods

new(options = nil) click to toggle source

Initialize the server selector.

@example Initialize the selector.

Mongo::ServerSelector::Secondary.new(:tag_sets => [{'dc' => 'nyc'}])

@example Initialize the preference with no options.

Mongo::ServerSelector::Secondary.new

@param [ Hash ] options The server preference options.

@option options [ Integer ] :local_threshold The local threshold boundary for

nearest selection in seconds.

@option options [ Integer ] max_staleness The maximum replication lag,

in seconds, that a secondary can suffer and still be eligible for a read.
A value of -1 is treated identically to nil, which is to not
have a maximum staleness.

@raise [ Error::InvalidServerPreference ] If tag sets are specified

but not allowed.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 44
def initialize(options = nil)
  options = options ? options.dup : {}
  if options[:max_staleness] == -1
    options.delete(:max_staleness)
  end
  @options = options.freeze
  @tag_sets = (options[:tag_sets] || []).freeze
  @max_staleness = options[:max_staleness]
  validate!
end

Public Instance Methods

==(other) click to toggle source

Check equality of two server selector.

@example Check server selector equality.

preference == other

@param [ Object ] other The other preference.

@return [ true, false ] Whether the objects are equal.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 77
def ==(other)
  name == other.name &&
    tag_sets == other.tag_sets &&
      max_staleness == other.max_staleness
end
candidates(cluster) click to toggle source

Get the potential candidates to select from the cluster.

@example Get the server candidates.

selectable.candidates(cluster)

@param [ Cluster ] cluster The cluster.

@return [ Array<Server> ] The candidate servers.

@since 2.4.0

# File lib/mongo/server_selector/selectable.rb, line 228
def candidates(cluster)
  if cluster.single?
    cluster.servers.each { |server| validate_max_staleness_support!(server) }
  elsif cluster.sharded?
    near_servers(cluster.servers).each { |server| validate_max_staleness_support!(server) }
  else
    validate_max_staleness_value!(cluster) unless cluster.unknown?
    select(cluster.servers)
  end
end
inspect() click to toggle source

Inspect the server selector.

@example Inspect the server selector.

selector.inspect

@return [ String ] The inspection.

@since 2.2.0

# File lib/mongo/server_selector/selectable.rb, line 91
def inspect
  "#<#{self.class.name}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect}>"
end
local_threshold() click to toggle source

Get the local threshold boundary for nearest selection in seconds.

@example Get the local threshold.

selector.local_threshold

@return [ Float ] The local threshold.

@since 2.0.0

@deprecated This setting is now taken from the cluster options when a server is selected.

Will be removed in 3.0.
# File lib/mongo/server_selector/selectable.rb, line 214
def local_threshold
  @local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD)
end
select_server(cluster, ping = nil) click to toggle source

Select a server from eligible candidates.

@example Select a server from the cluster.

selector.select_server(cluster)

@param [ Mongo::Cluster ] cluster The cluster from which to select an eligible server.

@return [ Mongo::Server ] A server matching the server preference.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 105
      def select_server(cluster, ping = nil)
        if cluster.replica_set?
          validate_max_staleness_value_early!
        end
        if cluster.addresses.empty?
          if Lint.enabled?
            unless cluster.servers.empty?
              raise Error::LintError, "Cluster has no addresses but has servers: #{cluster.servers.map(&:inspect).join(', ')}"
            end
          end
          msg = "Cluster has no addresses, and therefore will never have a server"
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end
=begin Add this check in version 3.0.0
        unless cluster.connected?
          msg = 'Cluster is disconnected'
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end
=end
        @local_threshold = cluster.options[:local_threshold] || LOCAL_THRESHOLD
        @server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT
        deadline = Time.now + server_selection_timeout
        while (time_remaining = deadline - Time.now) > 0
          servers = candidates(cluster)
          if Lint.enabled?
            servers.each do |server|
              if server.average_round_trip_time.nil?
                raise Error::LintError, "Server #{server.address} has nil average rtt"
              end
            end
          end
          if servers && !servers.compact.empty?
            unless cluster.topology.compatible?
              raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
            end

            # This list of servers may be ordered in a specific way
            # by the selector (e.g. for secondary preferred, the first
            # server may be a secondary and the second server may be primary)
            # and we should take the first server here respecting the order
            server = servers.first

            if cluster.topology.single? &&
              cluster.topology.replica_set_name &&
              cluster.topology.replica_set_name != server.description.replica_set_name
            then
              msg = "Cluster topology specifies replica set name #{cluster.topology.replica_set_name}, but the server has replica set name #{server.description.replica_set_name || '<nil>'}"
              raise Error::NoServerAvailable.new(self, cluster, msg)
            end

            return server
          end
          cluster.scan!(false)
          if cluster.server_selection_semaphore
            cluster.server_selection_semaphore.wait(time_remaining)
          else
            if Lint.enabled?
              raise Error::LintError, 'Waiting for server selection without having a server selection semaphore'
            end
            sleep 0.25
          end
        end

        msg = "No #{name} server is available in cluster: #{cluster.summary} " +
                "with timeout=#{server_selection_timeout}, " +
                "LT=#{local_threshold}"
        dead_monitors = []
        cluster.servers_list.each do |server|
          thread = server.monitor.instance_variable_get('@thread')
          if thread.nil? || !thread.alive?
            dead_monitors << server
          end
        end
        if dead_monitors.any?
          msg += ". The following servers have dead monitor threads: #{dead_monitors.map(&:summary).join(', ')}"
        end
        unless cluster.connected?
          msg += ". The cluster is disconnected (client may have been closed)"
        end
        raise Error::NoServerAvailable.new(self, cluster, msg)
      end
server_selection_timeout() click to toggle source

Get the timeout for server selection.

@example Get the server selection timeout, in seconds.

selector.server_selection_timeout

@return [ Float ] The timeout.

@since 2.0.0

@deprecated This setting is now taken from the cluster options when a server is selected.

Will be removed in 3.0.
# File lib/mongo/server_selector/selectable.rb, line 198
def server_selection_timeout
  @server_selection_timeout ||=
    (options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT)
end

Private Instance Methods

filter_stale_servers(candidates, primary = nil) click to toggle source
# File lib/mongo/server_selector/selectable.rb, line 307
def filter_stale_servers(candidates, primary = nil)
  return candidates unless @max_staleness

  if primary
    candidates.select do |server|
      validate_max_staleness_support!(server)
      staleness = (server.last_scan - server.last_write_date) -
                  (primary.last_scan - primary.last_write_date)  +
                  server.heartbeat_frequency_seconds
      staleness <= @max_staleness
    end
  else
    max_write_date = candidates.collect(&:last_write_date).max
    candidates.select do |server|
      validate_max_staleness_support!(server)
      staleness = max_write_date - server.last_write_date + server.heartbeat_frequency_seconds
      staleness <= @max_staleness
    end
  end
end
match_tag_sets(candidates) click to toggle source

Select the servers matching the defined tag sets.

@param [ Array ] candidates List of candidate servers from which those

matching the defined tag sets should be selected.

@return [ Array ] The servers matching the defined tag sets.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 298
def match_tag_sets(candidates)
  matches = []
  tag_sets.find do |tag_set|
    matches = candidates.select { |server| server.matches_tag_set?(tag_set) }
    !matches.empty?
  end
  matches || []
end
near_servers(candidates = []) click to toggle source

Select the near servers from a list of provided candidates, taking the

local threshold into account.

@param [ Array ] candidates List of candidate servers to select the

near servers from.

@return [ Array ] The near servers.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 283
def near_servers(candidates = [])
  return candidates if candidates.empty?
  nearest_server = candidates.min_by(&:average_round_trip_time)
  threshold = nearest_server.average_round_trip_time + local_threshold
  candidates.select { |server| server.average_round_trip_time <= threshold }.shuffle!
end
primary(candidates) click to toggle source

Select the primary from a list of provided candidates.

@param [ Array ] candidates List of candidate servers to select the

primary from.

@return [ Array ] The primary.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 249
def primary(candidates)
  candidates.select do |server|
    server.primary?
  end
end
secondaries(candidates) click to toggle source

Select the secondaries from a list of provided candidates.

@param [ Array ] candidates List of candidate servers to select the

secondaries from.

@return [ Array ] The secondary servers.

@since 2.0.0

# File lib/mongo/server_selector/selectable.rb, line 263
def secondaries(candidates)
  matching_servers = candidates.select(&:secondary?)
  matching_servers = filter_stale_servers(matching_servers, primary(candidates).first)
  matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty?
  # Per server selection spec the server selected MUST be a random
  # one matching staleness and latency requirements.
  # Selectors always pass the output of #secondaries to #nearest
  # which shuffles the server list, fulfilling this requirement.
  matching_servers
end
validate!() click to toggle source
# File lib/mongo/server_selector/selectable.rb, line 328
def validate!
  if !@tag_sets.all? { |set| set.empty? } && !tags_allowed?
    raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_TAG_SUPPORT)
  elsif @max_staleness && !max_staleness_allowed?
    raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_SUPPORT)
  end
end
validate_max_staleness_support!(server) click to toggle source
# File lib/mongo/server_selector/selectable.rb, line 336
def validate_max_staleness_support!(server)
  if @max_staleness && !server.features.max_staleness_enabled?
    raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_WITH_LEGACY_SERVER)
  end
end
validate_max_staleness_value!(cluster) click to toggle source
# File lib/mongo/server_selector/selectable.rb, line 352
def validate_max_staleness_value!(cluster)
  if @max_staleness
    heartbeat_frequency_seconds = cluster.options[:heartbeat_frequency] || Server::Monitor::HEARTBEAT_FREQUENCY
    unless @max_staleness >= [
      SMALLEST_MAX_STALENESS_SECONDS,
      min_cluster_staleness = heartbeat_frequency_seconds + Cluster::IDLE_WRITE_PERIOD_SECONDS,
    ].max
      msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
        "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " +
        "setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})"
      raise Error::InvalidServerPreference.new(msg)
    end
  end
end
validate_max_staleness_value_early!() click to toggle source
# File lib/mongo/server_selector/selectable.rb, line 342
def validate_max_staleness_value_early!
  if @max_staleness
    unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS
      msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
        "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS})"
      raise Error::InvalidServerPreference.new(msg)
    end
  end
end