Class: Mongo::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Event::Subscriber, Loggable, Monitoring::Publishable
Defined in:
lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/cursor_reaper.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/replica_set.rb

Overview

Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.

Since:

  • 2.0.0

Defined Under Namespace

Modules: Topology Classes: AppMetadata, CursorReaper

Constant Summary

MAX_READ_RETRIES =

The default number of mongos read retries.

Since:

  • 2.1.1

1
MAX_WRITE_RETRIES =

The default number of mongos write retries.

Since:

  • 2.4.2

1
READ_RETRY_INTERVAL =

The default mongos read retry interval, in seconds.

Since:

  • 2.1.1

5
IDLE_WRITE_PERIOD_SECONDS =

How often an idle primary writes a no-op to the oplog.

Since:

  • 2.4.0

10

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Event::Subscriber

#event_listeners

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Methods included from Event::Subscriber

#subscribe_to

Methods included from Monitoring::Publishable

#publish_command, #publish_event, #publish_sdam_event

Constructor Details

#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cluster should never be directly instantiated outside of a Client.

Instantiate the new cluster.

Examples:

Instantiate the cluster.

Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)

Parameters:

  • seeds (Array<String>)

    The addresses of the configured servers.

  • monitoring (Monitoring)

    The monitoring.

  • options (Hash) (defaults to: Options::Redacted.new)

    The options.

Since:

  • 2.0.0



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/mongo/cluster.rb', line 152

def initialize(seeds, monitoring, options = Options::Redacted.new)
  @addresses = []
  @servers = []
  @monitoring = monitoring
  @event_listeners = Event::Listeners.new
  @options = options.freeze
  @app_metadata ||= AppMetadata.new(self)
  @update_lock = Mutex.new
  @pool_lock = Mutex.new
  @topology = Topology.initial(seeds, monitoring, options)

  publish_sdam_event(
    Monitoring::TOPOLOGY_OPENING,
    Monitoring::Event::TopologyOpening.new(@topology)
  )

  subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
  subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
  subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))

  seeds.each{ |seed| add(seed) }

  publish_sdam_event(
    Monitoring::TOPOLOGY_CHANGED,
    Monitoring::Event::TopologyChanged.new(@topology, @topology)
  ) if @servers.size > 1

  @cursor_reaper = CursorReaper.new
  @cursor_reaper.run!

  ObjectSpace.define_finalizer(self, self.class.finalize(pools, @cursor_reaper))
end

Instance Attribute Details

#app_metadataMongo::Cluster::AppMetadata (readonly)

Returns The application metadata, used for connection handshakes.

Returns:

Since:

  • 2.4.0



64
65
66
# File 'lib/mongo/cluster.rb', line 64

def 
  @app_metadata
end

#monitoringMonitoring (readonly)

Returns monitoring The monitoring.

Returns:

Since:

  • 2.0.0



55
56
57
# File 'lib/mongo/cluster.rb', line 55

def monitoring
  @monitoring
end

#optionsHash (readonly)

Returns The options hash.

Returns:

  • (Hash)

    The options hash.

Since:

  • 2.0.0



52
53
54
# File 'lib/mongo/cluster.rb', line 52

def options
  @options
end

#topologyObject (readonly)

Returns The cluster topology.

Returns:

  • (Object)

    The cluster topology.

Since:

  • 2.0.0



58
59
60
# File 'lib/mongo/cluster.rb', line 58

def topology
  @topology
end

Class Method Details

.create(client) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a cluster for the provided client, for use when we don't want the client's original cluster instance to be the same.

Examples:

Create a cluster for the client.

Cluster.create(client)

Parameters:

  • client (Client)

    The client to create on.

Returns:

Since:

  • 2.0.0



424
425
426
427
428
429
430
431
# File 'lib/mongo/cluster.rb', line 424

def self.create(client)
  cluster = Cluster.new(
    client.cluster.addresses.map(&:to_s),
    client.instance_variable_get(:@monitoring).dup,
    client.options
  )
  client.instance_variable_set(:@cluster, cluster)
end

.finalize(pools, cursor_reaper) ⇒ Proc

Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.

Examples:

Finalize the cluster.

Cluster.finalize(pools)

Parameters:

Returns:

  • (Proc)

    The Finalizer.

Since:

  • 2.2.0



197
198
199
200
201
202
203
204
205
# File 'lib/mongo/cluster.rb', line 197

def self.finalize(pools, cursor_reaper)
  proc do
    begin; cursor_reaper.kill_cursors; rescue; end
    cursor_reaper.stop!
    pools.values.each do |pool|
      pool.disconnect!
    end
  end
end

Instance Method Details

#==(other) ⇒ true, false

Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.

Examples:

Is the cluster equal to the object?

cluster == other

Parameters:

  • other (Object)

    The object to compare to.

Returns:

  • (true, false)

    If the objects are equal.

Since:

  • 2.0.0



81
82
83
84
# File 'lib/mongo/cluster.rb', line 81

def ==(other)
  return false unless other.is_a?(Cluster)
  addresses == other.addresses && options == other.options
end

#add(host) ⇒ Server

Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.

Examples:

Add the server for the address to the cluster.

cluster.add('127.0.0.1:27018')

Parameters:

  • host (String)

    The address of the server to add.

Returns:

  • (Server)

    The newly added server, if not present already.

Since:

  • 2.0.0



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/mongo/cluster.rb', line 98

def add(host)
  address = Address.new(host, options)
  if !addresses.include?(address)
    if addition_allowed?(address)
      @update_lock.synchronize { @addresses.push(address) }
      server = Server.new(address, self, @monitoring, event_listeners, options)
      @update_lock.synchronize { @servers.push(server) }
      server
    end
  end
end

#add_hosts(description) ⇒ Object

Add hosts in a description to the cluster.

Examples:

Add hosts in a description to the cluster.

cluster.add_hosts(description)

Parameters:

Since:

  • 2.0.6



389
390
391
392
393
# File 'lib/mongo/cluster.rb', line 389

def add_hosts(description)
  if topology.add_hosts?(description, servers_list)
    description.servers.each { |s| add(s) }
  end
end

#addressesArray<Mongo::Address>

The addresses in the cluster.

Examples:

Get the addresses in the cluster.

cluster.addresses

Returns:

Since:

  • 2.0.6



441
442
443
# File 'lib/mongo/cluster.rb', line 441

def addresses
  addresses_list
end

#disconnect!true

Disconnect all servers.

Examples:

Disconnect the cluster's servers.

cluster.disconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



361
362
363
364
365
# File 'lib/mongo/cluster.rb', line 361

def disconnect!
  begin; @cursor_reaper.kill_cursors; rescue; end
  @cursor_reaper.stop!
  @servers.each { |server| server.disconnect! } and true
end

#elect_primary!(description) ⇒ Topology

Elect a primary server from the description that has just changed to a primary.

Examples:

Elect a primary server.

cluster.elect_primary!(description)

Parameters:

Returns:

Since:

  • 2.0.0



245
246
247
# File 'lib/mongo/cluster.rb', line 245

def elect_primary!(description)
  @topology = topology.elect_primary(description, servers_list)
end

#has_readable_server?(server_selector = nil) ⇒ true, false

Determine if the cluster would select a readable server for the provided read preference.

Examples:

Is a readable server present?

topology.has_readable_server?(server_selector)

Parameters:

  • server_selector (ServerSelector) (defaults to: nil)

    The server selector.

Returns:

  • (true, false)

    If a readable server is present.

Since:

  • 2.4.0



122
123
124
# File 'lib/mongo/cluster.rb', line 122

def has_readable_server?(server_selector = nil)
  topology.has_readable_server?(self, server_selector)
end

#has_writable_server?true, false

Determine if the cluster would select a writable server.

Examples:

Is a writable server present?

topology.has_writable_server?

Returns:

  • (true, false)

    If a writable server is present.

Since:

  • 2.4.0



134
135
136
# File 'lib/mongo/cluster.rb', line 134

def has_writable_server?
  topology.has_writable_server?(self)
end

#inspectString

Get the nicer formatted string for use in inspection.

Examples:

Inspect the cluster.

cluster.inspect

Returns:

  • (String)

    The cluster inspection.

Since:

  • 2.0.0



215
216
217
# File 'lib/mongo/cluster.rb', line 215

def inspect
  "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>"
end

#max_read_retriesInteger

Get the maximum number of times the cluster can retry a read operation on a mongos.

Examples:

Get the max read retries.

cluster.max_read_retries

Returns:

  • (Integer)

    The maximum retries.

Since:

  • 2.1.1



258
259
260
# File 'lib/mongo/cluster.rb', line 258

def max_read_retries
  options[:max_read_retries] || MAX_READ_RETRIES
end

#next_primary(ping = true) ⇒ Mongo::Server

Get the next primary server we can send an operation to.

Examples:

Get the next primary server.

cluster.next_primary

Parameters:

  • ping (true, false) (defaults to: true)

    Whether to ping the server before selection.

Returns:

Since:

  • 2.0.0



229
230
231
232
# File 'lib/mongo/cluster.rb', line 229

def next_primary(ping = true)
  @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY)
  @primary_selector.select_server(self, ping)
end

#pool(server) ⇒ Server::ConnectionPool

Get the scoped connection pool for the server.

Examples:

Get the connection pool.

cluster.pool(server)

Parameters:

  • server (Server)

    The server.

Returns:

Since:

  • 2.2.0



272
273
274
275
276
# File 'lib/mongo/cluster.rb', line 272

def pool(server)
  @pool_lock.synchronize do
    pools[server.address] ||= Server::ConnectionPool.get(server)
  end
end

#read_retry_intervalFloat

Get the interval, in seconds, in which a mongos read operation is retried.

Examples:

Get the read retry interval.

cluster.read_retry_interval

Returns:

  • (Float)

    The interval.

Since:

  • 2.1.1



287
288
289
# File 'lib/mongo/cluster.rb', line 287

def read_retry_interval
  options[:read_retry_interval] || READ_RETRY_INTERVAL
end

#reconnect!true

Reconnect all servers.

Examples:

Reconnect the cluster's servers.

cluster.reconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



375
376
377
378
379
# File 'lib/mongo/cluster.rb', line 375

def reconnect!
  scan!
  servers.each { |server| server.reconnect! }
  @cursor_reaper.restart! and true
end

#remove(host) ⇒ Object

Remove the server from the cluster for the provided address, if it exists.

Examples:

Remove the server from the cluster.

server.remove('127.0.0.1:27017')

Parameters:

  • host (String)

    The host/port or socket address.

Since:

  • 2.0.0



313
314
315
316
317
318
319
320
321
322
323
# File 'lib/mongo/cluster.rb', line 313

def remove(host)
  address = Address.new(host)
  removed_servers = @servers.select { |s| s.address == address }
  @update_lock.synchronize { @servers = @servers - removed_servers }
  removed_servers.each{ |server| server.disconnect! } if removed_servers
  publish_sdam_event(
    Monitoring::SERVER_CLOSED,
    Monitoring::Event::ServerClosed.new(address, topology)
  )
  @update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

#remove_hosts(description) ⇒ Object

Remove hosts in a description from the cluster.

Examples:

Remove hosts in a description from the cluster.

cluster.remove_hosts(description)

Parameters:

Since:

  • 2.0.6



403
404
405
406
407
408
409
# File 'lib/mongo/cluster.rb', line 403

def remove_hosts(description)
  if topology.remove_hosts?(description)
    servers_list.each do |s|
      remove(s.address.to_s) if topology.remove_server?(description, s)
    end
  end
end

#scan!true

Note:

This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.

Force a scan of all known servers in the cluster.

Examples:

Force a full cluster scan.

cluster.scan!

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0



336
337
338
# File 'lib/mongo/cluster.rb', line 336

def scan!
  servers_list.each{ |server| server.scan! } and true
end

#serversArray<Server>

Get a list of server candidates from the cluster that can have operations executed on them.

Examples:

Get the server candidates for an operation.

cluster.servers

Returns:

  • (Array<Server>)

    The candidate servers.

Since:

  • 2.0.0



349
350
351
# File 'lib/mongo/cluster.rb', line 349

def servers
  topology.servers(servers_list.compact).compact
end

#standalone_discoveredTopology

Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.

Examples:

Notify the cluster that a standalone server was discovered.

cluster.standalone_discovered

Returns:

Since:

  • 2.0.6



300
301
302
# File 'lib/mongo/cluster.rb', line 300

def standalone_discovered
  @topology = topology.standalone_discovered
end