Class: Mongo::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Event::Subscriber, Loggable, Monitoring::Publishable
Defined in:
lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/periodic_executor.rb,
lib/mongo/cluster/topology/replica_set.rb,
lib/mongo/cluster/reapers/cursor_reaper.rb,
lib/mongo/cluster/reapers/socket_reaper.rb

Overview

Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.

Since:

  • 2.0.0

Defined Under Namespace

Modules: Topology Classes: AppMetadata, CursorReaper, PeriodicExecutor, SocketReaper

Constant Summary

MAX_READ_RETRIES =

The default number of mongos read retries.

Since:

  • 2.1.1

1
MAX_WRITE_RETRIES =

The default number of mongos write retries.

Since:

  • 2.4.2

1
READ_RETRY_INTERVAL =

The default mongos read retry interval, in seconds.

Since:

  • 2.1.1

5
IDLE_WRITE_PERIOD_SECONDS =

How often an idle primary writes a no-op to the oplog.

Since:

  • 2.4.0

10
CLUSTER_TIME =

The cluster time key in responses from mongos servers.

Since:

  • 2.5.0

'clusterTime'.freeze

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Event::Subscriber

#event_listeners

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Methods included from Event::Subscriber

#subscribe_to

Methods included from Monitoring::Publishable

#publish_command, #publish_event, #publish_sdam_event

Constructor Details

#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cluster should never be directly instantiated outside of a Client.

Instantiate the new cluster.

Examples:

Instantiate the cluster.

Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)

Parameters:

  • seeds (Array<String>)

    The addresses of the configured servers.

  • monitoring (Monitoring)

    The monitoring.

  • options (Hash) (defaults to: Options::Redacted.new)

    The options.

Since:

  • 2.0.0



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/mongo/cluster.rb', line 169

def initialize(seeds, monitoring, options = Options::Redacted.new)
  @addresses = []
  @servers = []
  @monitoring = monitoring
  @event_listeners = Event::Listeners.new
  @options = options.freeze
  @app_metadata = AppMetadata.new(self)
  @update_lock = Mutex.new
  @pool_lock = Mutex.new
  @cluster_time = nil
  @cluster_time_lock = Mutex.new
  @topology = Topology.initial(seeds, monitoring, options)
  Session::SessionPool.create(self)

  publish_sdam_event(
    Monitoring::TOPOLOGY_OPENING,
    Monitoring::Event::TopologyOpening.new(@topology)
  )

  subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
  subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
  subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))

  seeds.each{ |seed| add(seed) }

  publish_sdam_event(
    Monitoring::TOPOLOGY_CHANGED,
    Monitoring::Event::TopologyChanged.new(@topology, @topology)
  ) if @servers.size > 1

  @cursor_reaper = CursorReaper.new
  @socket_reaper = SocketReaper.new(self)
  @periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper)
  @periodic_executor.run!

  ObjectSpace.define_finalizer(self, self.class.finalize(pools, @periodic_executor, @session_pool))
end

Instance Attribute Details

#app_metadataMongo::Cluster::AppMetadata (readonly)

Returns The application metadata, used for connection handshakes.

Returns:

Since:

  • 2.4.0



71
72
73
# File 'lib/mongo/cluster.rb', line 71

def 
  @app_metadata
end

#cluster_timeBSON::Document (readonly)

Returns The latest cluster time seen.

Returns:

  • (BSON::Document)

    The latest cluster time seen.

Since:

  • 2.5.0



76
77
78
# File 'lib/mongo/cluster.rb', line 76

def cluster_time
  @cluster_time
end

#monitoringMonitoring (readonly)

Returns monitoring The monitoring.

Returns:

Since:

  • 2.0.0



62
63
64
# File 'lib/mongo/cluster.rb', line 62

def monitoring
  @monitoring
end

#optionsHash (readonly)

Returns The options hash.

Returns:

  • (Hash)

    The options hash.

Since:

  • 2.0.0



59
60
61
# File 'lib/mongo/cluster.rb', line 59

def options
  @options
end

#session_poolObject (readonly)

Since:

  • 2.5.1



81
82
83
# File 'lib/mongo/cluster.rb', line 81

def session_pool
  @session_pool
end

#topologyObject (readonly)

Returns The cluster topology.

Returns:

  • (Object)

    The cluster topology.

Since:

  • 2.0.0



65
66
67
# File 'lib/mongo/cluster.rb', line 65

def topology
  @topology
end

Class Method Details

.create(client) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.

Examples:

Create a cluster for the client.

Cluster.create(client)

Parameters:

  • client (Client)

    The client to create on.

Returns:

Since:

  • 2.0.0



446
447
448
449
450
451
452
453
# File 'lib/mongo/cluster.rb', line 446

def self.create(client)
  cluster = Cluster.new(
    client.cluster.addresses.map(&:to_s),
    client.instance_variable_get(:@monitoring).dup,
    client.options
  )
  client.instance_variable_set(:@cluster, cluster)
end

.finalize(pools, periodic_executor, session_pool) ⇒ Proc

Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.

Examples:

Finalize the cluster.

Cluster.finalize(pools)

Parameters:

Returns:

  • (Proc)

    The Finalizer.

Since:

  • 2.2.0



220
221
222
223
224
225
226
227
228
# File 'lib/mongo/cluster.rb', line 220

def self.finalize(pools, periodic_executor, session_pool)
  proc do
    session_pool.end_sessions
    periodic_executor.stop!
    pools.values.each do |pool|
      pool.disconnect!
    end
  end
end

Instance Method Details

#==(other) ⇒ true, false

Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.

Examples:

Is the cluster equal to the object?

cluster == other

Parameters:

  • other (Object)

    The object to compare to.

Returns:

  • (true, false)

    If the objects are equal.

Since:

  • 2.0.0



98
99
100
101
# File 'lib/mongo/cluster.rb', line 98

def ==(other)
  return false unless other.is_a?(Cluster)
  addresses == other.addresses && options == other.options
end

#add(host) ⇒ Server

Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.

Examples:

Add the server for the address to the cluster.

cluster.add('127.0.0.1:27018')

Parameters:

  • host (String)

    The address of the server to add.

Returns:

  • (Server)

    The newly added server, if not present already.

Since:

  • 2.0.0



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/mongo/cluster.rb', line 115

def add(host)
  address = Address.new(host, options)
  if !addresses.include?(address)
    if addition_allowed?(address)
      @update_lock.synchronize { @addresses.push(address) }
      server = Server.new(address, self, @monitoring, event_listeners, options)
      @update_lock.synchronize { @servers.push(server) }
      server
    end
  end
end

#add_hosts(description) ⇒ Object

Add hosts in a description to the cluster.

Examples:

Add hosts in a description to the cluster.

cluster.add_hosts(description)

Parameters:

Since:

  • 2.0.6



411
412
413
414
415
# File 'lib/mongo/cluster.rb', line 411

def add_hosts(description)
  if topology.add_hosts?(description, servers_list)
    description.servers.each { |s| add(s) }
  end
end

#addressesArray<Mongo::Address>

The addresses in the cluster.

Examples:

Get the addresses in the cluster.

cluster.addresses

Returns:

Since:

  • 2.0.6



463
464
465
# File 'lib/mongo/cluster.rb', line 463

def addresses
  addresses_list
end

#disconnect!true

Disconnect all servers.

Examples:

Disconnect the cluster's servers.

cluster.disconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



384
385
386
387
# File 'lib/mongo/cluster.rb', line 384

def disconnect!
  @periodic_executor.stop!
  @servers.each { |server| server.disconnect! } and true
end

#elect_primary!(description) ⇒ Topology

Elect a primary server from the description that has just changed to a primary.

Examples:

Elect a primary server.

cluster.elect_primary!(description)

Parameters:

Returns:

Since:

  • 2.0.0



268
269
270
# File 'lib/mongo/cluster.rb', line 268

def elect_primary!(description)
  @topology = topology.elect_primary(description, servers_list)
end

#has_readable_server?(server_selector = nil) ⇒ true, false

Determine if the cluster would select a readable server for the provided read preference.

Examples:

Is a readable server present?

topology.has_readable_server?(server_selector)

Parameters:

  • server_selector (ServerSelector) (defaults to: nil)

    The server selector.

Returns:

  • (true, false)

    If a readable server is present.

Since:

  • 2.4.0



139
140
141
# File 'lib/mongo/cluster.rb', line 139

def has_readable_server?(server_selector = nil)
  topology.has_readable_server?(self, server_selector)
end

#has_writable_server?true, false

Determine if the cluster would select a writable server.

Examples:

Is a writable server present?

topology.has_writable_server?

Returns:

  • (true, false)

    If a writable server is present.

Since:

  • 2.4.0



151
152
153
# File 'lib/mongo/cluster.rb', line 151

def has_writable_server?
  topology.has_writable_server?(self)
end

#inspectString

Get the nicer formatted string for use in inspection.

Examples:

Inspect the cluster.

cluster.inspect

Returns:

  • (String)

    The cluster inspection.

Since:

  • 2.0.0



238
239
240
# File 'lib/mongo/cluster.rb', line 238

def inspect
  "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>"
end

#logical_session_timeoutInteger?

The logical session timeout value in minutes.

Examples:

Get the logical session timeout in minutes.

cluster.logical_session_timeout

Returns:

  • (Integer, nil)

    The logical session timeout.

Since:

  • 2.5.0



475
476
477
478
479
480
# File 'lib/mongo/cluster.rb', line 475

def logical_session_timeout
  servers.inject(nil) do |min, server|
    break unless timeout = server.logical_session_timeout
    [timeout, (min || timeout)].min
  end
end

#max_read_retriesInteger

Get the maximum number of times the cluster can retry a read operation on a mongos.

Examples:

Get the max read retries.

cluster.max_read_retries

Returns:

  • (Integer)

    The maximum retries.

Since:

  • 2.1.1



281
282
283
# File 'lib/mongo/cluster.rb', line 281

def max_read_retries
  options[:max_read_retries] || MAX_READ_RETRIES
end

#next_primary(ping = true) ⇒ Mongo::Server

Get the next primary server we can send an operation to.

Examples:

Get the next primary server.

cluster.next_primary

Parameters:

  • ping (true, false) (defaults to: true)

    Whether to ping the server before selection.

Returns:

Since:

  • 2.0.0



252
253
254
255
# File 'lib/mongo/cluster.rb', line 252

def next_primary(ping = true)
  @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY)
  @primary_selector.select_server(self, ping)
end

#pool(server) ⇒ Server::ConnectionPool

Get the scoped connection pool for the server.

Examples:

Get the connection pool.

cluster.pool(server)

Parameters:

  • server (Server)

    The server.

Returns:

Since:

  • 2.2.0



295
296
297
298
299
# File 'lib/mongo/cluster.rb', line 295

def pool(server)
  @pool_lock.synchronize do
    pools[server.address] ||= Server::ConnectionPool.get(server)
  end
end

#read_retry_intervalFloat

Get the interval, in seconds, in which a mongos read operation is retried.

Examples:

Get the read retry interval.

cluster.read_retry_interval

Returns:

  • (Float)

    The interval.

Since:

  • 2.1.1



310
311
312
# File 'lib/mongo/cluster.rb', line 310

def read_retry_interval
  options[:read_retry_interval] || READ_RETRY_INTERVAL
end

#reconnect!true

Reconnect all servers.

Examples:

Reconnect the cluster's servers.

cluster.reconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



397
398
399
400
401
# File 'lib/mongo/cluster.rb', line 397

def reconnect!
  scan!
  servers.each { |server| server.reconnect! }
  @periodic_executor.restart! and true
end

#remove(host) ⇒ Object

Remove the server from the cluster for the provided address, if it exists.

Examples:

Remove the server from the cluster.

server.remove('127.0.0.1:27017')

Parameters:

  • host (String)

    The host/port or socket address.

Since:

  • 2.0.0



336
337
338
339
340
341
342
343
344
345
346
# File 'lib/mongo/cluster.rb', line 336

def remove(host)
  address = Address.new(host)
  removed_servers = @servers.select { |s| s.address == address }
  @update_lock.synchronize { @servers = @servers - removed_servers }
  removed_servers.each{ |server| server.disconnect! } if removed_servers
  publish_sdam_event(
    Monitoring::SERVER_CLOSED,
    Monitoring::Event::ServerClosed.new(address, topology)
  )
  @update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

#remove_hosts(description) ⇒ Object

Remove hosts in a description from the cluster.

Examples:

Remove hosts in a description from the cluster.

cluster.remove_hosts(description)

Parameters:

Since:

  • 2.0.6



425
426
427
428
429
430
431
# File 'lib/mongo/cluster.rb', line 425

def remove_hosts(description)
  if topology.remove_hosts?(description)
    servers_list.each do |s|
      remove(s.address.to_s) if topology.remove_server?(description, s)
    end
  end
end

#scan!true

Note:

This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.

Force a scan of all known servers in the cluster.

Examples:

Force a full cluster scan.

cluster.scan!

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0



359
360
361
# File 'lib/mongo/cluster.rb', line 359

def scan!
  servers_list.each{ |server| server.scan! } and true
end

#serversArray<Server>

Get a list of server candidates from the cluster that can have operations executed on them.

Examples:

Get the server candidates for an operation.

cluster.servers

Returns:

  • (Array<Server>)

    The candidate servers.

Since:

  • 2.0.0



372
373
374
# File 'lib/mongo/cluster.rb', line 372

def servers
  topology.servers(servers_list.compact).compact
end

#standalone_discoveredTopology

Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.

Examples:

Notify the cluster that a standalone server was discovered.

cluster.standalone_discovered

Returns:

Since:

  • 2.0.6



323
324
325
# File 'lib/mongo/cluster.rb', line 323

def standalone_discovered
  @topology = topology.standalone_discovered
end

#update_cluster_time(result) ⇒ Object

Update the max cluster time seen in a response.

Examples:

Update the cluster time.

cluster.update_cluster_time(result)

Parameters:

Returns:

  • (Object)

    The cluster time.

Since:

  • 2.5.0



492
493
494
495
496
497
498
499
500
501
502
# File 'lib/mongo/cluster.rb', line 492

def update_cluster_time(result)
  if cluster_time_doc = result.cluster_time
    @cluster_time_lock.synchronize do
      if @cluster_time.nil?
        @cluster_time = cluster_time_doc
      elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME]
        @cluster_time = cluster_time_doc
      end
    end
  end
end