Class: Mongo::Server::ConnectionPool

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Loggable, Monitoring::Publishable
Defined in:
lib/mongo/server/connection_pool.rb

Overview

Represents a connection pool for server connections.

Since:

  • 2.0.0, largely rewritten in 2.9.0

Constant Summary collapse

DEFAULT_MAX_SIZE =

The default max size for the connection pool.

Since:

  • 2.9.0

5.freeze
DEFAULT_MIN_SIZE =

The default min size for the connection pool.

Since:

  • 2.9.0

0.freeze
DEFAULT_WAIT_TIMEOUT =

The default timeout, in seconds, to wait for a connection.

Since:

  • 2.9.0

1.freeze

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Monitoring::Publishable

#monitoring

Instance Method Summary collapse

Methods included from Monitoring::Publishable

#publish_cmap_event, #publish_event, #publish_sdam_event

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(server, options = {}) ⇒ ConnectionPool

Create the new connection pool.

Parameters:

  • server (Server)

    The server which this connection pool is for.

  • options (Hash) (defaults to: {})

    The connection pool options.

Options Hash (options):

  • :max_size (Integer)

    The maximum pool size.

  • :max_pool_size (Integer)

    Deprecated. The maximum pool size. If max_size is also given, max_size and max_pool_size must be identical.

  • :min_size (Integer)

    The minimum pool size.

  • :min_pool_size (Integer)

    Deprecated. The minimum pool size. If min_size is also given, min_size and min_pool_size must be identical.

  • :wait_timeout (Float)

    The time to wait, in seconds, for a free connection.

  • :wait_queue_timeout (Float)

    Deprecated. Alias for :wait_timeout. If both wait_timeout and wait_queue_timeout are given, their values must be identical.

  • :max_idle_time (Float)

    The time, in seconds, after which idle connections should be closed by the pool.

Since:

  • 2.0.0, API changed in 2.9.0



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/mongo/server/connection_pool.rb', line 63

def initialize(server, options = {})
  unless server.is_a?(Server)
    raise ArgumentError, 'First argument must be a Server instance'
  end
  options = options.dup
  if options[:min_size] && options[:min_pool_size] && options[:min_size] != options[:min_pool_size]
    raise ArgumentError, "Min size #{options[:min_size]} is not identical to min pool size #{options[:min_pool_size]}"
  end
  if options[:max_size] && options[:max_pool_size] && options[:max_size] != options[:max_pool_size]
    raise ArgumentError, "Max size #{options[:max_size]} is not identical to max pool size #{options[:max_pool_size]}"
  end
  if options[:wait_timeout] && options[:wait_queue_timeout] && options[:wait_timeout] != options[:wait_queue_timeout]
    raise ArgumentError, "Wait timeout #{options[:wait_timeout]} is not identical to wait queue timeout #{options[:wait_queue_timeout]}"
  end
  options[:min_size] ||= options[:min_pool_size]
  options.delete(:min_pool_size)
  options[:max_size] ||= options[:max_pool_size]
  options.delete(:max_pool_size)
  if options[:min_size] && options[:max_size] &&
    options[:min_size] > options[:max_size]
  then
    raise ArgumentError, "Cannot have min size #{options[:min_size]} exceed max size #{options[:max_size]}"
  end
  if options[:wait_queue_timeout]
    options[:wait_timeout] ||= options[:wait_queue_timeout]
  end
  options.delete(:wait_queue_timeout)

  @server = server
  @options = options.freeze

  @generation = 1
  @closed = false

  # A connection owned by this pool should be either in the
  # available connections array (which is used as a stack)
  # or in the checked out connections set.
  @available_connections = available_connections = []
  @checked_out_connections = Set.new

  # Mutex used for synchronizing access to @available_connections and
  # @checked_out_connections. The pool object is thread-safe, thus
  # all methods that retrieve or modify instance variables generally
  # must do so under this lock.
  @lock = Mutex.new

  # Condition variable broadcast when a connection is added to
  # @available_connections, to wake up any threads waiting for an
  # available connection when pool is at max size
  @available_semaphore = Semaphore.new

  finalizer = proc do
    available_connections.each do |connection|
      connection.disconnect!(reason: :pool_closed)
    end
    available_connections.clear
    # Finalizer does not close checked out connections.
    # Those would have to be garbage collected on their own
    # and that should close them.
  end
  ObjectSpace.define_finalizer(self, finalizer)

  publish_cmap_event(
    Monitoring::Event::Cmap::PoolCreated.new(@server.address, options)
  )
end

Instance Attribute Details

#generationInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns generation Generation of connections currently being used by the queue.

Returns:

  • (Integer)

    generation Generation of connections currently being used by the queue.

Since:

  • 2.9.0



175
176
177
# File 'lib/mongo/server/connection_pool.rb', line 175

def generation
  @generation
end

#optionsHash (readonly)

Returns options The pool options.

Returns:

  • (Hash)

    options The pool options.

Since:

  • 2.0.0, largely rewritten in 2.9.0



131
132
133
# File 'lib/mongo/server/connection_pool.rb', line 131

def options
  @options
end

Instance Method Details

#available_countInteger

Number of available connections in the pool.

Returns:

  • (Integer)

    Number of available connections.

Since:

  • 2.9.0



206
207
208
209
210
211
212
# File 'lib/mongo/server/connection_pool.rb', line 206

def available_count
  raise_if_closed!

  @lock.synchronize do
    @available_connections.length
  end
end

#check_in(connection) ⇒ Object

Check a connection back into the pool.

The connection must have been previously created by this pool.

Parameters:

Since:

  • 2.9.0



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/mongo/server/connection_pool.rb', line 319

def check_in(connection)
  @lock.synchronize do
    unless @checked_out_connections.include?(connection)
      raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection}"
    end

    @checked_out_connections.delete(connection)

    # Note: if an event handler raises, resource will not be signaled.
    # This means threads waiting for a connection to free up when
    # the pool is at max size may time out.
    # Threads that begin waiting after this method completes (with
    # the exception) should be fine.
    publish_cmap_event(
      Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id)
    )

    if closed?
      connection.disconnect!(reason: :pool_closed)
      return
    end

    if connection.closed?
      # Connection was closed - for example, because it experienced
      # a network error. Nothing else needs to be done here.
    elsif connection.generation != @generation
      connection.disconnect!(reason: :stale)
    else
      connection.record_checkin!
      @available_connections << connection

      # Wake up only one thread waiting for an available connection,
      # since only one connection was checked in.
      @available_semaphore.signal
    end
  end
end

#check_outMongo::Server::Connection

Checks a connection out of the pool.

If there are active connections in the pool, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise waits up to the wait timeout and raises Timeout::Error if there are still no active connections and the pool is at max size.

The returned connection counts toward the pool's max size. When the caller is finished using the connection, the connection should be checked back in via the check_in method.

Returns:

Raises:

  • (Timeout::Error)

    If the connection pool is at maximum size and remains so for longer than the wait timeout.

Since:

  • 2.9.0



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/mongo/server/connection_pool.rb', line 243

def check_out
  raise_if_closed!

  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address)
  )

  deadline = Time.now + wait_timeout
  connection = nil
  # It seems that synchronize sets up its own loop, thus a simple break
  # is insufficient to break the outer loop
  catch(:done) do
    loop do
      # Lock must be taken on each iteration, rather for the method
      # overall, otherwise other threads will not be able to check in
      # a connection while this thread is waiting for one.
      @lock.synchronize do
        until @available_connections.empty?
          connection = @available_connections.pop

          if connection.generation != generation
            # Stale connections should be disconnected in the clear
            # method, but if any don't, check again here
            connection.disconnect!(reason: :stale)
            next
          end

          if max_idle_time && connection.last_checkin &&
            Time.now - connection.last_checkin > max_idle_time
          then
            connection.disconnect!(reason: :idle)
            next
          end

          throw(:done)
        end

        # Ruby does not allow a thread to lock a mutex which it already
        # holds.
        if unsynchronized_size < max_size
          # This does not currently connect the socket and handshake,
          # but if it did, it would be performing i/o under our lock,
          # which is bad. Fix in the future.
          connection = create_connection
          throw(:done)
        end
      end

      wait = deadline - Time.now
      if wait <= 0
        publish_cmap_event(
          Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
            @server.address,
            Monitoring::Event::Cmap::ConnectionCheckOutFailed::TIMEOUT,
          ),
        )
        raise Error::ConnectionCheckOutTimeout.new(@server.address, wait_timeout)
      end
      @available_semaphore.wait(wait)
    end
  end

  @checked_out_connections << connection
  publish_cmap_event(
    Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id),
  )
  connection
end

#clear(options = nil) ⇒ true Also known as: disconnect!

Closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. The pool remains operational and can create new connections when requested.

Parameters:

  • options (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (options):

  • :lazy (true | false)

    If true, do not close any of the idle connections and instead let them be closed during a subsequent check out operation.

Returns:

  • (true)

    true.

Since:

  • 2.1.0



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/mongo/server/connection_pool.rb', line 369

def clear(options = nil)
  raise_if_closed!

  @lock.synchronize do
    @generation += 1

    publish_cmap_event(
      Monitoring::Event::Cmap::PoolCleared.new(@server.address)
    )

    unless options && options[:lazy]
      until @available_connections.empty?
        connection = @available_connections.pop
        connection.disconnect!(reason: :stale)
      end
    end
  end

  true
end

#close(options = nil) ⇒ true

Marks the pool closed, closes all idle connections in the pool and schedules currently checked out connections to be closed when they are checked back into the pool. If force option is true, checked out connections are also closed. Attempts to use the pool after it is closed will raise Error::PoolClosedError.

Parameters:

  • options (Hash) (defaults to: nil)

    a customizable set of options

Options Hash (options):

  • :force (true | false)

    Also close all checked out connections.

Returns:

  • (true)

    true.

Since:

  • 2.9.0



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/mongo/server/connection_pool.rb', line 406

def close(options = nil)
  return if closed?

  @lock.synchronize do
    until @available_connections.empty?
      connection = @available_connections.pop
      connection.disconnect!(reason: :pool_closed)
    end

    if options && options[:force]
      until @checked_out_connections.empty?
        connection = @checked_out_connections.take(1).first
        connection.disconnect!(reason: :pool_closed)
        @checked_out_connections.delete(connection)
      end
    end
  end

  @closed = true

  publish_cmap_event(
    Monitoring::Event::Cmap::PoolClosed.new(@server.address)
  )

  true
end

#close_idle_socketsObject

Close sockets that have been open for longer than the max idle time,

if the option is set.

Since:

  • 2.5.0



476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/mongo/server/connection_pool.rb', line 476

def close_idle_sockets
  return if closed?
  return unless max_idle_time

  @lock.synchronize do
    i = 0
    while i < @available_connections.length
      connection = @available_connections[i]
      if last_checkin = connection.last_checkin
        if (Time.now - last_checkin) > max_idle_time
          connection.disconnect!(reason: :idle)
          @available_connections.delete_at(i)
          next
        end
      end
      i += 1
    end
  end
end

#closed?true | false

Whether the pool has been closed.

Returns:

  • (true | false)

    Whether the pool is closed.

Since:

  • 2.9.0



219
220
221
# File 'lib/mongo/server/connection_pool.rb', line 219

def closed?
  !!@closed
end

#inspectString

Get a pretty printed string inspection for the pool.

Examples:

Inspect the pool.

pool.inspect

Returns:

  • (String)

    The pool inspection.

Since:

  • 2.0.0



441
442
443
444
445
446
447
448
449
# File 'lib/mongo/server/connection_pool.rb', line 441

def inspect
  if closed?
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} closed>"
  else
    "#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
      "wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>"
  end
end

#max_idle_timeFloat | nil

The maximum seconds a socket can remain idle since it has been checked in to the pool, if set.

Returns:

  • (Float | nil)

    The max socket idle time in seconds.

Since:

  • 2.9.0



166
167
168
# File 'lib/mongo/server/connection_pool.rb', line 166

def max_idle_time
  @max_idle_time ||= options[:max_idle_time]
end

#max_sizeInteger

Get the maximum size of the connection pool.

Returns:

  • (Integer)

    The maximum size of the connection pool.

Since:

  • 2.9.0



138
139
140
# File 'lib/mongo/server/connection_pool.rb', line 138

def max_size
  @max_size ||= options[:max_size] || [DEFAULT_MAX_SIZE, min_size].max
end

#min_sizeInteger

Get the minimum size of the connection pool.

Returns:

  • (Integer)

    The minimum size of the connection pool.

Since:

  • 2.9.0



147
148
149
# File 'lib/mongo/server/connection_pool.rb', line 147

def min_size
  @min_size ||= options[:min_size] || DEFAULT_MIN_SIZE
end

#populateObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates up to the min size connections.

Used by the spec test runner.

Since:

  • 2.0.0, largely rewritten in 2.9.0



501
502
503
504
505
# File 'lib/mongo/server/connection_pool.rb', line 501

def populate
  while size < min_size
    @available_connections << create_connection
  end
end

#sizeInteger

Size of the connection pool.

Includes available and checked out connections.

Returns:

  • (Integer)

    Size of the connection pool.

Since:

  • 2.9.0



184
185
186
187
188
189
190
# File 'lib/mongo/server/connection_pool.rb', line 184

def size
  raise_if_closed!

  @lock.synchronize do
    unsynchronized_size
  end
end

#wait_timeoutFloat

The time to wait, in seconds, for a connection to become available.

Returns:

  • (Float)

    The queue wait timeout.

Since:

  • 2.9.0



156
157
158
# File 'lib/mongo/server/connection_pool.rb', line 156

def wait_timeout
  @wait_timeout ||= options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT
end

#with_connectionObject

Yield the block to a connection, while handling check in/check out logic.

Examples:

Execute with a connection.

pool.with_connection do |connection|
  connection.read
end

Returns:

  • (Object)

    The result of the block.

Since:

  • 2.0.0



461
462
463
464
465
466
467
468
469
470
# File 'lib/mongo/server/connection_pool.rb', line 461

def with_connection
  raise_if_closed!

  connection = check_out
  yield(connection)
ensure
  if connection
    check_in(connection)
  end
end