Class: Mongo::Server::ConnectionPool::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Loggable
Defined in:
lib/mongo/server/connection_pool/queue.rb

Overview

Note:

The queue contains active connections that are available for use. It does not track connections which are in use (checked out). It is easy to confuse the size of the connection pool (number of connections that are used plus number of connections that are available for use) and the size of the queue (number of connections that have already been created that are available for use). API documentation for this class states whether each size refers to the pool or to the queue size. Note that minimum and maximum sizes only make sense when talking about the connection pool, as the size of the queue of available connections is determined by the size constraints of the pool plus how many connections are currently checked out.

A LIFO queue of connections to be used by the connection pool. This is based on mperham's connection pool.

Since:

  • 2.0.0

Constant Summary collapse

MAX_SIZE =

The default max size for the connection pool.

Since:

  • 2.0.0

5.freeze
MIN_SIZE =

The default min size for the connection pool.

Since:

  • 2.0.0

1.freeze
WAIT_TIMEOUT =

The default timeout, in seconds, to wait for a connection.

Since:

  • 2.0.0

1.freeze

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Constructor Details

#initialize(options = {}, &block) ⇒ Queue

Initialize the new queue. Will yield the block the number of times equal to the initial connection pool size.

Examples:

Create the queue.

Mongo::Server::ConnectionPool::Queue.new(max_pool_size: 5) { Connection.new }

Parameters:

  • options (Hash) (defaults to: {})

    The options.

Options Hash (options):

  • :max_pool_size (Integer)

    The maximum pool size.

  • :min_pool_size (Integer)

    The minimum pool size.

  • :wait_queue_timeout (Float)

    The time to wait, in seconds, for a free connection.

Since:

  • 2.0.0



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/mongo/server/connection_pool/queue.rb', line 63

def initialize(options = {}, &block)
  if options[:min_pool_size] && options[:max_pool_size] &&
    options[:min_pool_size] > options[:max_pool_size]
  then
    raise ArgumentError, "Cannot have min size > max size"
  end
  @block = block
  # This is the number of connections in the pool.
  # Includes available connections in the queue and the checked
  # out connections that we don't otherwise track.
  @pool_size = 0
  @options = options
  @generation = 1
  if min_size > max_size
    raise ArgumentError, "min_size (#{min_size}) cannot exceed max_size (#{max_size})"
  end
  @queue = Array.new(min_size) { create_connection }
  @mutex = Mutex.new
  @resource = ConditionVariable.new
  check_count_invariants
end

Instance Attribute Details

#generationInteger (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns generation Generation of connections currently being used by the queue.

Returns:

  • (Integer)

    generation Generation of connections currently being used by the queue.

Since:

  • 2.7.0



90
91
92
# File 'lib/mongo/server/connection_pool/queue.rb', line 90

def generation
  @generation
end

#mutexMutex (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns mutex The mutex used for synchronization of access to #queue.

Returns:

  • (Mutex)

    mutex The mutex used for synchronization of access to #queue.

Since:

  • 2.0.0



99
100
101
# File 'lib/mongo/server/connection_pool/queue.rb', line 99

def mutex
  @mutex
end

#optionsHash (readonly)

Returns options The options.

Returns:

  • (Hash)

    options The options.

Since:

  • 2.0.0



102
103
104
# File 'lib/mongo/server/connection_pool/queue.rb', line 102

def options
  @options
end

#pool_sizeObject (readonly)

Number of connections in the pool (active connections ready to be checked out plus connections already checked out).

Since:

  • 2.7.0



126
127
128
# File 'lib/mongo/server/connection_pool/queue.rb', line 126

def pool_size
  @pool_size
end

#queueArray (readonly)

Returns queue The underlying array of connections.

Returns:

  • (Array)

    queue The underlying array of connections.

Since:

  • 2.0.0



93
94
95
# File 'lib/mongo/server/connection_pool/queue.rb', line 93

def queue
  @queue
end

#resourceConditionVariable (readonly)

Returns resource The resource.

Returns:

  • (ConditionVariable)

    resource The resource.

Since:

  • 2.0.0



105
106
107
# File 'lib/mongo/server/connection_pool/queue.rb', line 105

def resource
  @resource
end

Instance Method Details

#close_stale_sockets!Object

Close sockets that have been open for longer than the max idle time,

if the option is set.

Examples:

Close the stale sockets

queue.close_stale_sockets!

Since:

  • 2.5.0



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/mongo/server/connection_pool/queue.rb', line 294

def close_stale_sockets!
  check_count_invariants
  return unless max_idle_time

  mutex.synchronize do
    i = 0
    while i < queue.length
      connection = queue[i]
      if last_checkin = connection.last_checkin
        if (Time.now - last_checkin) > max_idle_time
          connection.disconnect!
          queue.delete_at(i)
          @pool_size -= 1
          next
        end
      end
      i += 1
    end
  end
ensure
  check_count_invariants
end

#dequeueMongo::Server::Connection

Retrieves a connection. If there are active connections in the queue, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise raises Timeout::Error.

Examples:

Dequeue a connection.

queue.dequeue

Returns:

Raises:

  • (Timeout::Error)

    If the connection pool is at maximum size and remains so for longer than the wait timeout.

Since:

  • 2.0.0



141
142
143
144
145
146
# File 'lib/mongo/server/connection_pool/queue.rb', line 141

def dequeue
  check_count_invariants
  dequeue_connection
ensure
  check_count_invariants
end

#disconnect!true

Closes all idle connections in the queue and schedules currently dequeued connections to be closed when they are enqueued back into the queue. The queue remains operational and can create new connections when requested.

Examples:

Disconnect all connections.

queue.disconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/mongo/server/connection_pool/queue.rb', line 159

def disconnect!
  check_count_invariants
  mutex.synchronize do
    while connection = queue.pop
      connection.disconnect!
      @pool_size -= 1
      if @pool_size < 0
        # This should never happen
        log_warn("ConnectionPool::Queue: connection accounting problem")
        @pool_size = 0
      end
    end
    @generation += 1
    true
  end
ensure
  check_count_invariants
end

#enqueue(connection) ⇒ Object

Enqueue a connection in the queue.

Only connections created by this queue should be enqueued back into it, however the queue does not verify whether it originally created the connection being enqueued.

If linting is enabled (see Mongo::Lint), attempting to enqueue connections beyond the pool's capacity will raise Mongo::Error::LintError (since some of those connections must not have originated from the queue into which they are being enqueued). If linting is not enabled, the queue can grow beyond its max size with undefined results.

Examples:

Enqueue a connection.

queue.enqueue(connection)

Parameters:

Since:

  • 2.0.0



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/mongo/server/connection_pool/queue.rb', line 197

def enqueue(connection)
  check_count_invariants
  mutex.synchronize do
    if connection.generation == @generation
      queue.unshift(connection.record_checkin!)
      resource.broadcast
    else
      connection.disconnect!

      @pool_size = if @pool_size > 0
        @pool_size - 1
      else
        # This should never happen
        log_warn("ConnectionPool::Queue: unexpected enqueue")
        0
      end

      while @pool_size < min_size
        @pool_size += 1
        queue.unshift(@block.call(@generation))
      end
    end
  end
  nil
ensure
  check_count_invariants
end

#inspectString

Get a pretty printed string inspection for the queue.

Examples:

Inspect the queue.

queue.inspect

Returns:

  • (String)

    The queue inspection.

Since:

  • 2.0.0



233
234
235
236
# File 'lib/mongo/server/connection_pool/queue.rb', line 233

def inspect
  "#<Mongo::Server::ConnectionPool::Queue:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
    "wait_timeout=#{wait_timeout} current_size=#{queue_size}>"
end

#max_idle_timeFloat

The maximum seconds a socket can remain idle since it has been checked in to the pool.

Examples:

Get the max idle time.

queue.max_idle_time

Returns:

  • (Float)

    The max socket idle time in seconds.

Since:

  • 2.5.0



283
284
285
# File 'lib/mongo/server/connection_pool/queue.rb', line 283

def max_idle_time
  @max_idle_time ||= options[:max_idle_time]
end

#max_sizeInteger

Get the maximum size of the connection pool.

Examples:

Get the max size.

queue.max_size

Returns:

  • (Integer)

    The maximum size of the connection pool.

Since:

  • 2.0.0



246
247
248
# File 'lib/mongo/server/connection_pool/queue.rb', line 246

def max_size
  @max_size ||= options[:max_pool_size] || [MAX_SIZE, min_size].max
end

#min_sizeInteger

Get the minimum size of the connection pool.

Examples:

Get the min size.

queue.min_size

Returns:

  • (Integer)

    The minimum size of the connection pool.

Since:

  • 2.0.0



258
259
260
# File 'lib/mongo/server/connection_pool/queue.rb', line 258

def min_size
  @min_size ||= options[:min_pool_size] || MIN_SIZE
end

#sizeObject Also known as: queue_size

Number of connections that the pool has which are ready to be checked out. This is NOT the size of the connection pool (total number of active connections created by the pool).

Since:

  • 2.0.0



110
111
112
113
114
# File 'lib/mongo/server/connection_pool/queue.rb', line 110

def size
  mutex.synchronize do
    queue.size
  end
end

#wait_timeoutFloat

The time to wait, in seconds, for a connection to become available.

Examples:

Get the wait timeout.

queue.wait_timeout

Returns:

  • (Float)

    The queue wait timeout.

Since:

  • 2.0.0



270
271
272
# File 'lib/mongo/server/connection_pool/queue.rb', line 270

def wait_timeout
  @wait_timeout ||= options[:wait_queue_timeout] || WAIT_TIMEOUT
end