Class: Mongo::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Event::Subscriber, Loggable, Monitoring::Publishable
Defined in:
lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/periodic_executor.rb,
lib/mongo/cluster/topology/replica_set.rb,
lib/mongo/cluster/reapers/cursor_reaper.rb,
lib/mongo/cluster/reapers/socket_reaper.rb

Overview

Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.

Since:

  • 2.0.0

Defined Under Namespace

Modules: Topology Classes: AppMetadata, CursorReaper, PeriodicExecutor, SocketReaper

Constant Summary collapse

MAX_READ_RETRIES =

The default number of mongos read retries.

Since:

  • 2.1.1

1
MAX_WRITE_RETRIES =

The default number of mongos write retries.

Since:

  • 2.4.2

1
READ_RETRY_INTERVAL =

The default mongos read retry interval, in seconds.

Since:

  • 2.1.1

5
IDLE_WRITE_PERIOD_SECONDS =

How often an idle primary writes a no-op to the oplog.

Since:

  • 2.4.0

10
CLUSTER_TIME =

The cluster time key in responses from mongos servers.

Since:

  • 2.5.0

'clusterTime'.freeze

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Event::Subscriber

#event_listeners

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Methods included from Event::Subscriber

#subscribe_to

Methods included from Monitoring::Publishable

#publish_command, #publish_event, #publish_sdam_event

Constructor Details

#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cluster should never be directly instantiated outside of a Client.

Note:

When connecting to a mongodb+srv:// URI, the client expands such a URI into a list of servers and passes that list to the Cluster constructor. When connecting to a standalone mongod, the Cluster constructor receives the corresponding address as an array of one string.

Instantiate the new cluster.

Examples:

Instantiate the cluster.

Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)

Parameters:

  • seeds (Array<String>)

    The addresses of the configured servers

  • monitoring (Monitoring)

    The monitoring.

  • options (Hash) (defaults to: Options::Redacted.new)

    Options. Client constructor forwards its options to Cluster constructor, although Cluster recognizes only a subset of the options recognized by Client.

Since:

  • 2.0.0



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/mongo/cluster.rb', line 176

def initialize(seeds, monitoring, options = Options::Redacted.new)
  @addresses = []
  @servers = []
  @monitoring = monitoring
  @event_listeners = Event::Listeners.new
  @options = options.freeze
  @app_metadata = AppMetadata.new(self)
  @update_lock = Mutex.new
  @pool_lock = Mutex.new
  @cluster_time = nil
  @cluster_time_lock = Mutex.new
  @topology = Topology.initial(seeds, monitoring, options)
  Session::SessionPool.create(self)

  publish_sdam_event(
    Monitoring::TOPOLOGY_OPENING,
    Monitoring::Event::TopologyOpening.new(@topology)
  )

  subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
  subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
  subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))

  seeds.each{ |seed| add(seed) }

  publish_sdam_event(
    Monitoring::TOPOLOGY_CHANGED,
    Monitoring::Event::TopologyChanged.new(@topology, @topology)
  ) if @servers.size > 1

  @cursor_reaper = CursorReaper.new
  @socket_reaper = SocketReaper.new(self)
  @periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper)
  @periodic_executor.run!

  ObjectSpace.define_finalizer(self, self.class.finalize(pools, @periodic_executor, @session_pool))
end

Instance Attribute Details

#app_metadataMongo::Cluster::AppMetadata (readonly)

Returns The application metadata, used for connection handshakes.

Returns:

Since:

  • 2.4.0



71
72
73
# File 'lib/mongo/cluster.rb', line 71

def 
  @app_metadata
end

#cluster_timeBSON::Document (readonly)

Returns The latest cluster time seen.

Returns:

  • (BSON::Document)

    The latest cluster time seen.

Since:

  • 2.5.0



76
77
78
# File 'lib/mongo/cluster.rb', line 76

def cluster_time
  @cluster_time
end

#monitoringMonitoring (readonly)

Returns monitoring The monitoring.

Returns:

Since:

  • 2.0.0



62
63
64
# File 'lib/mongo/cluster.rb', line 62

def monitoring
  @monitoring
end

#optionsHash (readonly)

Returns The options hash.

Returns:

  • (Hash)

    The options hash.

Since:

  • 2.0.0



59
60
61
# File 'lib/mongo/cluster.rb', line 59

def options
  @options
end

#session_poolObject (readonly)

Since:

  • 2.5.1



81
82
83
# File 'lib/mongo/cluster.rb', line 81

def session_pool
  @session_pool
end

#topologyObject (readonly)

Returns The cluster topology.

Returns:

  • (Object)

    The cluster topology.

Since:

  • 2.0.0



65
66
67
# File 'lib/mongo/cluster.rb', line 65

def topology
  @topology
end

Class Method Details

.create(client) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.

Examples:

Create a cluster for the client.

Cluster.create(client)

Parameters:

  • client (Client)

    The client to create on.

Returns:

Since:

  • 2.0.0



455
456
457
458
459
460
461
462
# File 'lib/mongo/cluster.rb', line 455

def self.create(client)
  cluster = Cluster.new(
    client.cluster.addresses.map(&:to_s),
    Monitoring.new,
    client.options
  )
  client.instance_variable_set(:@cluster, cluster)
end

.finalize(pools, periodic_executor, session_pool) ⇒ Proc

Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.

Examples:

Finalize the cluster.

Cluster.finalize(pools)

Parameters:

Returns:

  • (Proc)

    The Finalizer.

Since:

  • 2.2.0



227
228
229
230
231
232
233
234
235
# File 'lib/mongo/cluster.rb', line 227

def self.finalize(pools, periodic_executor, session_pool)
  proc do
    session_pool.end_sessions
    periodic_executor.stop!
    pools.values.each do |pool|
      pool.disconnect!
    end
  end
end

Instance Method Details

#==(other) ⇒ true, false

Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.

Examples:

Is the cluster equal to the object?

cluster == other

Parameters:

  • other (Object)

    The object to compare to.

Returns:

  • (true, false)

    If the objects are equal.

Since:

  • 2.0.0



98
99
100
101
# File 'lib/mongo/cluster.rb', line 98

def ==(other)
  return false unless other.is_a?(Cluster)
  addresses == other.addresses && options == other.options
end

#add(host) ⇒ Server

Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.

Examples:

Add the server for the address to the cluster.

cluster.add('127.0.0.1:27018')

Parameters:

  • host (String)

    The address of the server to add.

Returns:

  • (Server)

    The newly added server, if not present already.

Since:

  • 2.0.0



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/mongo/cluster.rb', line 115

def add(host)
  address = Address.new(host, options)
  if !addresses.include?(address)
    if addition_allowed?(address)
      @update_lock.synchronize { @addresses.push(address) }
      server = Server.new(address, self, @monitoring, event_listeners, options)
      @update_lock.synchronize { @servers.push(server) }
      server
    end
  end
end

#add_hosts(description) ⇒ Object

Add hosts in a description to the cluster.

Examples:

Add hosts in a description to the cluster.

cluster.add_hosts(description)

Parameters:

Since:

  • 2.0.6



420
421
422
423
424
# File 'lib/mongo/cluster.rb', line 420

def add_hosts(description)
  if topology.add_hosts?(description, servers_list)
    description.servers.each { |s| add(s) }
  end
end

#addressesArray<Mongo::Address>

The addresses in the cluster.

Examples:

Get the addresses in the cluster.

cluster.addresses

Returns:

Since:

  • 2.0.6



472
473
474
# File 'lib/mongo/cluster.rb', line 472

def addresses
  addresses_list
end

#disconnect!true

Disconnect all servers.

Examples:

Disconnect the cluster's servers.

cluster.disconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



393
394
395
396
# File 'lib/mongo/cluster.rb', line 393

def disconnect!
  @periodic_executor.stop!
  @servers.each { |server| server.disconnect! } and true
end

#elect_primary!(description) ⇒ Topology

Elect a primary server from the description that has just changed to a primary.

Examples:

Elect a primary server.

cluster.elect_primary!(description)

Parameters:

Returns:

Since:

  • 2.0.0



277
278
279
# File 'lib/mongo/cluster.rb', line 277

def elect_primary!(description)
  @topology = topology.elect_primary(description, servers_list)
end

#has_readable_server?(server_selector = nil) ⇒ true, false

Determine if the cluster would select a readable server for the provided read preference.

Examples:

Is a readable server present?

topology.has_readable_server?(server_selector)

Parameters:

  • server_selector (ServerSelector) (defaults to: nil)

    The server selector.

Returns:

  • (true, false)

    If a readable server is present.

Since:

  • 2.4.0



139
140
141
# File 'lib/mongo/cluster.rb', line 139

def has_readable_server?(server_selector = nil)
  topology.has_readable_server?(self, server_selector)
end

#has_writable_server?true, false

Determine if the cluster would select a writable server.

Examples:

Is a writable server present?

topology.has_writable_server?

Returns:

  • (true, false)

    If a writable server is present.

Since:

  • 2.4.0



151
152
153
# File 'lib/mongo/cluster.rb', line 151

def has_writable_server?
  topology.has_writable_server?(self)
end

#inspectString

Get the nicer formatted string for use in inspection.

Examples:

Inspect the cluster.

cluster.inspect

Returns:

  • (String)

    The cluster inspection.

Since:

  • 2.0.0



245
246
247
# File 'lib/mongo/cluster.rb', line 245

def inspect
  "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>"
end

#logical_session_timeoutInteger?

The logical session timeout value in minutes.

Examples:

Get the logical session timeout in minutes.

cluster.logical_session_timeout

Returns:

  • (Integer, nil)

    The logical session timeout.

Since:

  • 2.5.0



484
485
486
487
488
489
# File 'lib/mongo/cluster.rb', line 484

def logical_session_timeout
  servers.inject(nil) do |min, server|
    break unless timeout = server.logical_session_timeout
    [timeout, (min || timeout)].min
  end
end

#max_read_retriesInteger

Get the maximum number of times the cluster can retry a read operation on a mongos.

Examples:

Get the max read retries.

cluster.max_read_retries

Returns:

  • (Integer)

    The maximum retries.

Since:

  • 2.1.1



290
291
292
# File 'lib/mongo/cluster.rb', line 290

def max_read_retries
  options[:max_read_retries] || MAX_READ_RETRIES
end

#next_primary(ping = true) ⇒ Mongo::Server

Get the next primary server we can send an operation to.

Examples:

Get the next primary server.

cluster.next_primary

Parameters:

  • ping (true, false) (defaults to: true)

    Whether to ping the server before selection. Deprecated, not necessary with the implementation of the Server Selection specification.

Returns:

Since:

  • 2.0.0



261
262
263
264
# File 'lib/mongo/cluster.rb', line 261

def next_primary(ping = true)
  @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY)
  @primary_selector.select_server(self)
end

#pool(server) ⇒ Server::ConnectionPool

Get the scoped connection pool for the server.

Examples:

Get the connection pool.

cluster.pool(server)

Parameters:

  • server (Server)

    The server.

Returns:

Since:

  • 2.2.0



304
305
306
307
308
# File 'lib/mongo/cluster.rb', line 304

def pool(server)
  @pool_lock.synchronize do
    pools[server.address] ||= Server::ConnectionPool.get(server)
  end
end

#read_retry_intervalFloat

Get the interval, in seconds, in which a mongos read operation is retried.

Examples:

Get the read retry interval.

cluster.read_retry_interval

Returns:

  • (Float)

    The interval.

Since:

  • 2.1.1



319
320
321
# File 'lib/mongo/cluster.rb', line 319

def read_retry_interval
  options[:read_retry_interval] || READ_RETRY_INTERVAL
end

#reconnect!true

Reconnect all servers.

Examples:

Reconnect the cluster's servers.

cluster.reconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



406
407
408
409
410
# File 'lib/mongo/cluster.rb', line 406

def reconnect!
  scan!
  servers.each { |server| server.reconnect! }
  @periodic_executor.restart! and true
end

#remove(host) ⇒ Object

Remove the server from the cluster for the provided address, if it exists.

Examples:

Remove the server from the cluster.

server.remove('127.0.0.1:27017')

Parameters:

  • host (String)

    The host/port or socket address.

Since:

  • 2.0.0



345
346
347
348
349
350
351
352
353
354
355
# File 'lib/mongo/cluster.rb', line 345

def remove(host)
  address = Address.new(host)
  removed_servers = @servers.select { |s| s.address == address }
  @update_lock.synchronize { @servers = @servers - removed_servers }
  removed_servers.each{ |server| server.disconnect! } if removed_servers
  publish_sdam_event(
    Monitoring::SERVER_CLOSED,
    Monitoring::Event::ServerClosed.new(address, topology)
  )
  @update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

#remove_hosts(description) ⇒ Object

Remove hosts in a description from the cluster.

Examples:

Remove hosts in a description from the cluster.

cluster.remove_hosts(description)

Parameters:

Since:

  • 2.0.6



434
435
436
437
438
439
440
# File 'lib/mongo/cluster.rb', line 434

def remove_hosts(description)
  if topology.remove_hosts?(description)
    servers_list.each do |s|
      remove(s.address.to_s) if topology.remove_server?(description, s)
    end
  end
end

#scan!true

Note:

This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.

Force a scan of all known servers in the cluster.

Examples:

Force a full cluster scan.

cluster.scan!

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0



368
369
370
# File 'lib/mongo/cluster.rb', line 368

def scan!
  servers_list.each{ |server| server.scan! } and true
end

#serversArray<Server>

Get a list of server candidates from the cluster that can have operations executed on them.

Examples:

Get the server candidates for an operation.

cluster.servers

Returns:

  • (Array<Server>)

    The candidate servers.

Since:

  • 2.0.0



381
382
383
# File 'lib/mongo/cluster.rb', line 381

def servers
  topology.servers(servers_list.compact).compact
end

#standalone_discoveredTopology

Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.

Examples:

Notify the cluster that a standalone server was discovered.

cluster.standalone_discovered

Returns:

Since:

  • 2.0.6



332
333
334
# File 'lib/mongo/cluster.rb', line 332

def standalone_discovered
  @topology = topology.standalone_discovered
end

#update_cluster_time(result) ⇒ Object

Update the max cluster time seen in a response.

Examples:

Update the cluster time.

cluster.update_cluster_time(result)

Parameters:

Returns:

  • (Object)

    The cluster time.

Since:

  • 2.5.0



501
502
503
504
505
506
507
508
509
510
511
# File 'lib/mongo/cluster.rb', line 501

def update_cluster_time(result)
  if cluster_time_doc = result.cluster_time
    @cluster_time_lock.synchronize do
      if @cluster_time.nil?
        @cluster_time = cluster_time_doc
      elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME]
        @cluster_time = cluster_time_doc
      end
    end
  end
end