Class: Mongo::Collection::View::ChangeStream

Inherits:
Aggregation
  • Object
show all
Includes:
Retryable
Defined in:
lib/mongo/collection/view/change_stream.rb,
lib/mongo/collection/view/change_stream/retryable.rb

Overview

Note:

Only available in server versions 3.6 and higher.

Note:

ChangeStreams do not work properly with JRuby because of the issue documented here: github.com/jruby/jruby/issues/4212. Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread, therefore calling #next on the change stream will cause getMores to be called in a loop in the background.

Provides behaviour around a `$changeStream` pipeline stage in the aggregation framework. Specifying this stage allows users to request that notifications are sent for all changes to a particular collection or database.

Since:

  • 2.5.0

Defined Under Namespace

Modules: Retryable

Constant Summary collapse

FULL_DOCUMENT_DEFAULT =

Returns The fullDocument option default value.

Returns:

  • (String)

    The fullDocument option default value.

Since:

  • 2.5.0

'default'.freeze
DATABASE =

Returns Used to indicate that the change stream should listen for changes on the entire database rather than just the collection.

Returns:

  • (Symbol)

    Used to indicate that the change stream should listen for changes on the entire database rather than just the collection.

Since:

  • 2.6.0

:database
CLUSTER =

Returns Used to indicate that the change stream should listen for changes on the entire cluster rather than just the collection.

Returns:

  • (Symbol)

    Used to indicate that the change stream should listen for changes on the entire cluster rather than just the collection.

Since:

  • 2.6.0

:cluster

Constants inherited from Aggregation

Aggregation::REROUTE

Constants included from Loggable

Loggable::PREFIX

Constants included from Explainable

Explainable::ALL_PLANS_EXECUTION, Explainable::EXECUTION_STATS, Explainable::QUERY_PLANNER

Instance Attribute Summary collapse

Attributes inherited from Aggregation

#view

Instance Method Summary collapse

Methods inherited from Aggregation

#allow_disk_use, #explain

Methods included from Retryable

#read_with_one_retry, #read_with_retry, #write_with_retry

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Methods included from Explainable

#explain

Methods included from Iterable

#close_query

Constructor Details

#initialize(view, pipeline, changes_for, options = {}) ⇒ ChangeStream

Initialize the change stream for the provided collection view, pipeline and options.

Examples:

Create the new change stream view.

ChangeStream.new(view, pipeline, options)

Parameters:

  • view (Collection::View)

    The collection view.

  • pipeline (Array<Hash>)

    The pipeline of operators to filter the change notifications.

  • options (Hash) (defaults to: {})

    The change stream options.

Options Hash (options):

  • :full_document (String)

    Allowed values: 'default', 'updateLookup'. Defaults to 'default'. When set to 'updateLookup', the change notification for partial updates will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.

  • :resume_after (BSON::Document, Hash)

    Specifies the logical starting point for the new change stream.

  • :max_await_time_ms (Integer)

    The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :collation (BSON::Document, Hash)

    The collation to use.

  • :start_at_operation_time (BSON::Timestamp)

    Only return changes that occurred at or after the specified timestamp. Any command run against the server will return a cluster time that can be used here. Only recognized by server versions 4.0+.

Since:

  • 2.5.0



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/mongo/collection/view/change_stream.rb', line 86

def initialize(view, pipeline, changes_for, options = {})
  @view = view
  @changes_for = changes_for
  @change_stream_filters = pipeline && pipeline.dup
  @options = options && options.dup.freeze
  @resume_token = @options[:resume_after]
  create_cursor!

  # We send different parameters when we resume a change stream
  # compared to when we send the first query
  @resuming = true
end

Instance Attribute Details

#optionsBSON::Document (readonly)

Returns The change stream options.

Returns:

  • (BSON::Document)

    The change stream options.

Since:

  • 2.5.0



58
59
60
# File 'lib/mongo/collection/view/change_stream.rb', line 58

def options
  @options
end

Instance Method Details

#closenil

Close the change stream.

Examples:

Close the change stream.

stream.close

Returns:

  • (nil)

    nil.

Since:

  • 2.5.0



203
204
205
206
207
208
# File 'lib/mongo/collection/view/change_stream.rb', line 203

def close
  unless closed?
    begin; @cursor.send(:kill_cursors); rescue; end
    @cursor = nil
  end
end

#closed?true, false

Is the change stream closed?

Examples:

Determine whether the change stream is closed.

stream.closed?

Returns:

  • (true, false)

    If the change stream is closed.

Since:

  • 2.5.0



218
219
220
# File 'lib/mongo/collection/view/change_stream.rb', line 218

def closed?
  @cursor.nil?
end

#each {|Each| ... } ⇒ Enumerator

Iterate through documents returned by the change stream.

This method retries once per error on resumable errors (two consecutive errors result in the second error being raised, an error which is recovered from resets the error count to zero).

Examples:

Iterate through the stream of documents.

stream.each do |document|
  p document
end

Yield Parameters:

  • Each (BSON::Document)

    change stream document.

Returns:

  • (Enumerator)

    The enumerator.

Raises:

  • (StopIteration)

Since:

  • 2.5.0



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/mongo/collection/view/change_stream.rb', line 115

def each
  raise StopIteration.new if closed?
  retried = false
  begin
    @cursor.each do |doc|
      cache_resume_token(doc)
      yield doc
    end if block_given?
    @cursor.to_enum
  rescue Mongo::Error => e
    if retried || !e.change_stream_resumable?
      raise
    end

    retried = true
    # Rerun initial aggregation.
    # Any errors here will stop iteration and break out of this
    # method
    close
    create_cursor!
    retry
  end
end

#inspectString

Get a formatted string for use in inspection.

Examples:

Inspect the change stream object.

stream.inspect

Returns:

  • (String)

    The change stream inspection.

Since:

  • 2.5.0



230
231
232
233
# File 'lib/mongo/collection/view/change_stream.rb', line 230

def inspect
  "#<Mongo::Collection::View:ChangeStream:0x#{object_id} filters=#{@change_stream_filters} " +
    "options=#{@options} resume_token=#{@resume_token}>"
end

#to_enumObject

Since:

  • 2.5.0



184
185
186
187
188
189
190
191
192
193
# File 'lib/mongo/collection/view/change_stream.rb', line 184

def to_enum
  enum = super
  enum.send(:instance_variable_set, '@obj', self)
  class << enum
    def try_next
      @obj.try_next
    end
  end
  enum
end

#try_nextBSON::Document | nil

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

This method is experimental and subject to change.

Return one document from the change stream, if one is available.

Retries once on a resumable error.

Raises StopIteration if the change stream is closed.

This method will wait up to max_await_time_ms milliseconds for changes from the server, and if no changes are received it will return nil.

Returns:

  • (BSON::Document | nil)

    A change stream document.

Raises:

  • (StopIteration)

Since:

  • 2.5.0



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/mongo/collection/view/change_stream.rb', line 153

def try_next
  raise StopIteration.new if closed?
  retried = false

  begin
    doc = @cursor.try_next
  rescue Mongo::Error => e
    unless e.change_stream_resumable?
      raise
    end

    if retried
      # Rerun initial aggregation.
      # Any errors here will stop iteration and break out of this
      # method
      close
      create_cursor!
      retried = false
    else
      # Attempt to retry a getMore once
      retried = true
      retry
    end
  end

  if doc
    cache_resume_token(doc)
  end
  doc
end