Skip to content

Commit 0b8a852

Browse files
DOCSP-37539 Change Streams (#89)
(cherry picked from commit 410e265)
1 parent 97138a6 commit 0b8a852

File tree

4 files changed

+320
-1
lines changed

4 files changed

+320
-1
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# start-open-change-stream
2+
database = client["sample_restaurants"]
3+
collection = database["restaurants"]
4+
5+
with collection.watch() as stream:
6+
for change in stream:
7+
print(change)
8+
# end-open-change-stream
9+
10+
# start-update-for-change-stream
11+
database = client["sample_restaurants"]
12+
collection = database["restaurants"]
13+
14+
query_filter = { "name": "Blarney Castle" }
15+
update_operation = { '$set' :
16+
{ "cuisine": "Irish" }
17+
}
18+
19+
result = collection.update_one(query_filter, update_operation)
20+
# end-update-for-change-stream
21+
22+
# start-change-stream-pipeline
23+
24+
change_pipeline = { "$match": { "operationType": "update" }},
25+
26+
with collection.watch(pipeline=change_pipeline) as stream:
27+
for change in stream:
28+
print(change)
29+
# end-change-stream-pipeline
30+
31+
# start-change-stream-post-image
32+
database = client["sample_restaurants"]
33+
collection = database["restaurants"]
34+
35+
with collection.watch(full_document='updateLookup') as stream:
36+
for change in stream:
37+
print(change)
38+
# end-change-stream-post-image

source/includes/usage-examples/retrieve-code-examples.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,10 @@
3030

3131
for document in results:
3232
print(document)
33-
# end-distinct
33+
# end-distinct
34+
35+
# start-watch-for-changes
36+
with collection.watch() as stream:
37+
for change in stream:
38+
print(change)
39+
# end-watch-for-changes

source/read.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Read Data from MongoDB
2929
/read/count
3030
/read/distinct
3131
/read/cursors
32+
/read/change-streams
3233

3334
Overview
3435
--------
@@ -127,3 +128,15 @@ Retrieve Distinct Values
127128

128129
To learn more about the ``distinct()`` method, see the
129130
:ref:`pymongo-distinct` guide.
131+
132+
Monitor Data Changes
133+
--------------------
134+
135+
.. literalinclude:: /includes/usage-examples/retrieve-code-examples.py
136+
:start-after: start-watch-for-changes
137+
:end-before: end-watch-for-changes
138+
:language: python
139+
:copyable:
140+
141+
To learn more about the ``watch()`` method, see the
142+
:ref:`pymongo-change-streams` guide.

source/read/change-streams.txt

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
.. _pymongo-change-streams:
2+
3+
====================
4+
Monitor Data Changes
5+
====================
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 2
11+
:class: singlecol
12+
13+
.. facet::
14+
:name: genre
15+
:values: reference
16+
17+
.. meta::
18+
:keywords: watch, code example
19+
20+
Overview
21+
--------
22+
23+
In this guide, you can learn how to use a **change stream** to monitor real-time
24+
changes to your database. A change stream is a MongoDB Server feature that
25+
allows your application to subscribe to data changes on a collection, database,
26+
or deployment.
27+
28+
Sample Data
29+
~~~~~~~~~~~
30+
31+
The examples in this guide use the ``sample_restaurants.restaurants`` collection
32+
from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
33+
free MongoDB Atlas cluster and load the sample datasets, see
34+
:ref:`<pymongo-get-started>`.
35+
36+
Open a Change Stream
37+
--------------------
38+
39+
To open a change stream, call the ``watch()`` method. The instance on which you
40+
call the ``watch()`` method on determines the scope of events that the change
41+
stream listens for. You can call the ``watch()`` method on the following
42+
classes:
43+
44+
- ``MongoClient``: To monitor all changes in the MongoDB deployment
45+
- ``Database``: To monitor changes in all collections in the database
46+
- ``Collection``: To monitor changes in the collection
47+
48+
The following example opens a change stream on the ``restaurants`` collection
49+
and outputs changes as they occur:
50+
51+
.. literalinclude:: /includes/read/change-streams.py
52+
:start-after: start-open-change-stream
53+
:end-before: end-open-change-stream
54+
:language: python
55+
56+
To begin watching for changes, run the application. Then, in a separate
57+
application or shell, modify the ``restaurants`` collection. The following
58+
example updates a document with a ``name`` field value of ``Blarney Castle``:
59+
60+
.. _pymongo-change-stream-update:
61+
62+
.. literalinclude:: /includes/read/change-streams.py
63+
:start-after: start-update-for-change-stream
64+
:end-before: end-update-for-change-stream
65+
:language: python
66+
67+
When you update the collection, the change stream application prints the change
68+
as it occurs. The printed change event resembles the
69+
following:
70+
71+
.. code-block:: sh
72+
73+
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
74+
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
75+
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
76+
77+
Modify the Change Stream Output
78+
-------------------------------
79+
80+
You can pass the ``pipeline`` parameter to the ``watch()`` method to modify the
81+
change stream output. This parameter allows you to watch for only specified
82+
change events. Format the parameter as a list of objects that each represent an
83+
aggregation stage.
84+
85+
You can specify the following stages in the ``pipeline`` parameter:
86+
87+
- ``$addFields``
88+
- ``$match``
89+
- ``$project``
90+
- ``$replaceRoot``
91+
- ``$replaceWith``
92+
- ``$redact``
93+
- ``$set``
94+
- ``$unset``
95+
96+
The following example uses the ``pipeline`` parameter to open a change stream
97+
that records only update operations:
98+
99+
.. literalinclude:: /includes/read/change-streams.py
100+
:start-after: start-change-stream-pipeline
101+
:end-before: end-change-stream-pipeline
102+
:language: python
103+
104+
To learn more about modifying your change stream output, see the
105+
:manual:`Modify Change Stream Output
106+
</changeStreams/#modify-change-stream-output>` section in the MongoDB Server
107+
manual.
108+
109+
Modify ``watch()`` Behavior
110+
---------------------------
111+
112+
The ``watch()`` method accepts optional parameters, which represent
113+
options you can use to configure the operation. If you don't specify any
114+
options, the driver does not customize the operation.
115+
116+
The following table describes the options you can set to customize the behavior
117+
of ``watch()``:
118+
119+
.. list-table::
120+
:widths: 30 70
121+
:header-rows: 1
122+
123+
* - Property
124+
- Description
125+
126+
* - ``pipeline``
127+
- | A list of aggregation pipeline stages that modify the output of the
128+
change stream.
129+
130+
* - ``full_document``
131+
- | Specifies whether to show the full document after the change, rather
132+
than showing only the changes made to the document. To learn more about
133+
this option, see :ref:`pymongo-change-stream-pre-post-image`.
134+
135+
* - ``full_document_before_change``
136+
- | Specifies whether to show the full document as it was before the change, rather
137+
than showing only the changes made to the document. To learn more about
138+
this option, see :ref:`pymongo-change-stream-pre-post-image`.
139+
140+
* - ``resume_after``
141+
- | Directs ``watch()`` to resume returning changes after the
142+
operation specified in the resume token.
143+
| Each change stream event document includes a resume token as the ``_id``
144+
field. Pass the entire ``_id`` field of the change event document that
145+
represents the operation you want to resume after.
146+
| ``resume_after`` is mutually exclusive with ``start_after`` and ``start_at_operation_time``.
147+
148+
* - ``start_after``
149+
- | Directs ``watch()`` to start a new change stream after the
150+
operation specified in the resume token. Allows notifications to
151+
resume after an invalidate event.
152+
| Each change stream event document includes a resume token as the ``_id``
153+
field. Pass the entire ``_id`` field of the change event document that
154+
represents the operation you want to resume after.
155+
| ``start_after`` is mutually exclusive with ``resume_after`` and ``start_at_operation_time``.
156+
157+
* - ``start_at_operation_time``
158+
- | Directs ``watch()`` to return only events that occur after the
159+
specified timestamp.
160+
| ``start_at_operation_time`` is mutually exclusive with ``resume_after`` and ``start_after``.
161+
162+
* - ``max_await_time_ms``
163+
- | The maximum amount of time, in milliseconds, the server waits for new
164+
data changes to report to the change stream cursor before returning an
165+
empty batch. Defaults to 1000 milliseconds.
166+
167+
* - ``show_expanded_events``
168+
- | Starting in MongoDB Server v6.0, change streams support change notifications
169+
for Data Definition Language (DDL) events, such as the ``createIndexes`` and ``dropIndexes`` events. To
170+
include expanded events in a change stream, create the change stream
171+
cursor and set this parameter to ``True``.
172+
173+
* - ``batch_size``
174+
- | The maximum number of change events to return in each batch of the
175+
response from the MongoDB cluster.
176+
177+
* - ``collation``
178+
- | The collation to use for the change stream cursor.
179+
180+
* - ``session``
181+
- | An instance of ``ClientSession``.
182+
183+
* - ``comment``
184+
- | A comment to attach to the operation.
185+
186+
.. _pymongo-change-stream-pre-post-image:
187+
188+
Include Pre-Images and Post-Images
189+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
190+
191+
.. important::
192+
193+
You can enable pre-images and post-images on collections only if your
194+
deployment uses MongoDB v6.0 or later.
195+
196+
By default, when you perform an operation on a collection, the
197+
corresponding change event includes only the delta of the fields
198+
modified by that operation. To see the full document before or after a
199+
change, specify the ``full_document_before_change`` or the ``full_document``
200+
parameters in the ``watch()`` method.
201+
202+
The **pre-image** is the full version of a document *before* a change. To include the
203+
pre-image in the change stream event, set the ``full_document_before_change``
204+
parameter to one of the following values:
205+
206+
- ``whenAvailable``: The change event includes a pre-image of the
207+
modified document for change events only if the pre-image is available.
208+
- ``required``: The change event includes a pre-image of the
209+
modified document for change events. If the pre-image is not available, the
210+
driver raises an error.
211+
212+
The **post-image** is the full version of a document *after* a change. To include the
213+
post-image in the change stream event, set the ``full_document`` parameter to
214+
one of the following values:
215+
216+
- ``updateLookup``: The change event includes a copy of the entire changed
217+
document from some time after the change.
218+
- ``whenAvailable``: The change event includes a post-image of the
219+
modified document for change events only if the post-image is available.
220+
- ``required``: The change event includes a post-image of the
221+
modified document for change events. If the post-image is not available, the
222+
driver raises an error.
223+
224+
The following example calls the ``watch()`` method on a collection and includes the post-image
225+
of updated documents by specifying the ``fullDocument`` parameter:
226+
227+
.. literalinclude:: /includes/read/change-streams.py
228+
:start-after: start-change-stream-post-image
229+
:end-before: end-change-stream-post-image
230+
:language: python
231+
232+
With the change stream application running, updating a document in the
233+
``restaurants`` collection by using the :ref:`preceding update example
234+
<pymongo-change-stream-update>` prints a change event resembling the following:
235+
236+
.. code-block:: sh
237+
:emphasize-lines: 2,5
238+
239+
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
240+
'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens',
241+
'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'},
242+
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
243+
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
244+
245+
To learn more about pre-images and post-images, see
246+
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
247+
in the MongoDB Server manual.
248+
249+
Additional Information
250+
----------------------
251+
252+
To learn more about change streams, see :manual:`Change Streams
253+
</changeStreams>` in the MongoDB Server manual.
254+
255+
API Documentation
256+
~~~~~~~~~~~~~~~~~
257+
258+
To learn more about any of the methods or types discussed in this
259+
guide, see the following API documentation:
260+
261+
- `watch() <{+api-root+}pymongo/collection.html#pymongo.collection.Collection.watch>`__
262+
- `update_one() <{+api-root+}pymongo/collection.html#pymongo.collection.Collection.update_one>`__

0 commit comments

Comments
 (0)