Skip to content

Commit 9086828

Browse files
authored
DOCSP-26099: cs is an async iterable (#498)
* DOCSP-26099: change stream async iterable * addtl work * MW PR fixes 1 * small fixes * MW PR fixes 2 * small fixes * remove redundant note
1 parent b9162d1 commit 9086828

File tree

3 files changed

+151
-66
lines changed

3 files changed

+151
-66
lines changed

source/code-snippets/usage-examples/changeStream.js

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,22 @@ const uri = "<connection string uri>";
55

66
const client = new MongoClient(uri);
77

8-
const simulateAsyncPause = () =>
9-
new Promise(resolve => {
10-
setTimeout(() => resolve(), 1000);
11-
});
12-
138
let changeStream;
149
async function run() {
1510
try {
1611
const database = client.db("insertDB");
1712
const collection = database.collection("haikus");
1813

19-
// open a Change Stream on the "haikus" collection
14+
// Open a Change Stream on the "haikus" collection
2015
changeStream = collection.watch();
2116

22-
// set up a listener when change events are emitted
23-
changeStream.on("change", next => {
24-
// process any change event
25-
console.log("received a change to the collection: \t", next);
26-
});
27-
28-
await simulateAsyncPause();
29-
30-
await collection.insertOne({
31-
title: "Record of a Shriveled Datum",
32-
content: "No bytes, no problem. Just insert a document, in MongoDB",
33-
});
34-
35-
await simulateAsyncPause();
17+
// Print change events
18+
for await (const change of changeStream) {
19+
console.log("Received change:\n", change);
20+
}
3621

3722
await changeStream.close();
3823

39-
console.log("closed the change stream");
4024
} finally {
4125
await client.close();
4226
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { MongoClient } from "mongodb";
2+
3+
// Replace the uri string with your MongoDB deployment's connection string.
4+
const uri = "<connection string uri>";
5+
6+
const client = new MongoClient(uri);
7+
8+
const simulateAsyncPause = () =>
9+
new Promise(resolve => {
10+
setTimeout(() => resolve(), 1000);
11+
});
12+
13+
let changeStream;
14+
async function run() {
15+
try {
16+
const database = client.db("insertDB");
17+
const collection = database.collection("haikus");
18+
19+
// open a Change Stream on the "haikus" collection
20+
changeStream = collection.watch();
21+
22+
// set up a listener when change events are emitted
23+
changeStream.on("change", next => {
24+
// process any change event
25+
console.log("received a change to the collection: \t", next);
26+
});
27+
28+
await simulateAsyncPause();
29+
30+
await collection.insertOne({
31+
title: "Record of a Shriveled Datum",
32+
content: "No bytes, no problem. Just insert a document, in MongoDB",
33+
});
34+
35+
await simulateAsyncPause();
36+
37+
await changeStream.close();
38+
39+
console.log("closed the change stream");
40+
} finally {
41+
await client.close();
42+
}
43+
}
44+
run().catch(console.dir);

source/usage-examples/changeStream.txt

Lines changed: 102 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,29 @@ The ``watch()`` method accepts an ``options`` object as the second parameter. Re
3434
section for more information on the settings you can configure with this object.
3535

3636
The ``watch()`` method returns an instance of a `ChangeStream <{+api+}/classes/ChangeStream.html>`__. You can read events from
37-
change streams by iterating over them or listening for events. Select the tab that corresponds to the way you want to
38-
read events from the change stream below.
37+
change streams by iterating over them or listening for events.
3938

40-
.. include:: /includes/changestream-paradigm-warning.rst
39+
Select the tab that corresponds to the way you want to
40+
read events from the change stream:
4141

4242
.. tabs::
4343

4444
.. tab::
45-
:tabid: Iterative
45+
:tabid: Idiomatic Iteration
46+
47+
Starting in version 4.12, ``ChangeStream`` objects are async
48+
iterables. With this change, you can use ``for-await`` loops to
49+
retrieve events from an open change stream:
50+
51+
.. code-block:: js
52+
:copyable: false
53+
54+
for await (const change of changeStream) {
55+
console.log("Received change: ", change);
56+
}
57+
58+
.. tab::
59+
:tabid: Manual Iteration
4660

4761
You can call methods on the ``ChangeStream`` object such as:
4862

@@ -67,7 +81,6 @@ read events from the change stream below.
6781
can specify logic in the listener to process the change event document
6882
when it is received.
6983

70-
7184
You can control the change stream by calling ``pause()`` to stop emitting events or ``resume()`` to continue to emit events.
7285

7386
To stop processing change events, call the `close() <{+api+}/classes/ChangeStream.html#close>`__ method on the
@@ -77,42 +90,16 @@ read events from the change stream below.
7790

7891
changeStream.close();
7992

80-
Visit the following resources for additional material on the classes and
81-
methods presented above:
93+
.. include:: /includes/changestream-paradigm-warning.rst
8294

83-
- :manual:`Change streams </changeStreams/>`
84-
- :manual:`Change events </reference/change-events/>`
85-
- :manual:`Aggregation pipeline </reference/operator/aggregation-pipeline/>`
86-
- :manual:`Aggregation stages </changeStreams/#modify-change-stream-output>`
87-
- `ChangeStream class API documentation <{+api+}/classes/ChangeStream.html>`__
88-
- `Collection.watch() <{+api+}/classes/Collection.html#watch>`__,
89-
- `Db.watch() <{+api+}/classes/Db.html#watch>`__,
90-
- `MongoClient.watch() API documentation <{+api+}/classes/MongoClient.html#watch>`__
95+
Examples
96+
--------
9197

92-
Example
93-
-------
98+
Iteration
99+
~~~~~~~~~
94100

95101
The following example opens a change stream on the ``haikus`` collection in
96-
the ``insertDB`` database. Let's create a listener function to receive and
97-
print change events that occur on the collection.
98-
99-
First, open the change stream on the collection and then define a listener
100-
on the change stream using the ``on()`` method. Once set, generate a change
101-
event by performing a change to the collection.
102-
103-
To generate the change event on the collection, let's use ``insertOne()``
104-
method to add a new document. Since the ``insertOne()`` may run before the
105-
listener function can register, we use a timer, defined as
106-
``simulateAsyncPause`` to wait 1 second before executing the insert.
107-
108-
We also use ``simulateAsyncPause`` after the insertion of the document
109-
to provide ample time for the listener function to receive the change
110-
event and for the listener to complete its execution before
111-
closing the ``ChangeStream`` instance using the ``close()`` method.
112-
113-
The timers used in this example are only necessary for this demonstration
114-
to make sure there is enough time to register listener and have the
115-
listener process the event before exiting.
102+
the ``insertDB`` database and prints change events as they occur:
116103

117104
.. include:: /includes/connect-guide-note.rst
118105

@@ -137,20 +124,30 @@ listener process the event before exiting.
137124
The JavaScript and TypeScript code snippets above are identical. There are no
138125
TypeScript specific features of the driver relevant to this use case.
139126

140-
If you run the preceding example, you should see the following output:
127+
When you run this code and then make a change to the ``haikus``
128+
collection, such as performing an insert or delete operation, you can
129+
see the change event document printed in your terminal.
141130

142-
.. code-block:: javascript
131+
For example, if you insert a document to the collection, the code prints
132+
the following output:
133+
134+
.. code-block:: none
143135
:copyable: false
144136

145-
received a change to the collection: {
146-
_id: { _data: '825EC...' },
137+
Received change:
138+
{
139+
_id: {
140+
_data: '...'
141+
},
147142
operationType: 'insert',
148-
clusterTime: new Timestamp { ... },
149-
fullDocument: { _id: new ObjectId(...), title: 'Record of a Shriveled Datum', content: 'No bytes, no problem. Just insert a document, in MongoDB' },
143+
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
144+
fullDocument: {
145+
_id: new ObjectId("..."),
146+
...
147+
},
150148
ns: { db: 'insertDB', coll: 'haikus' },
151-
documentKey: { _id: new ObjectId(...) }
149+
documentKey: { _id: new ObjectId("...") }
152150
}
153-
closed the change stream
154151

155152
.. note:: Receive Full Documents From Updates
156153

@@ -169,3 +166,63 @@ If you run the preceding example, you should see the following output:
169166

170167
const changeStream = collection.watch(pipeline, options);
171168

169+
Listener Function
170+
~~~~~~~~~~~~~~~~~
171+
172+
The following example opens a change stream on the ``haikus`` collection in
173+
the ``insertDB`` database. Let's create a listener function to receive and
174+
print change events that occur on the collection.
175+
176+
First, open the change stream on the collection and then define a listener
177+
on the change stream using the ``on()`` method. Once you set the
178+
listener, generate a change event by performing a change to the collection.
179+
180+
To generate the change event on the collection, let's use the ``insertOne()``
181+
method to add a new document. Since ``insertOne()`` may run before the
182+
listener function can register, we use a timer, defined as
183+
``simulateAsyncPause`` to wait 1 second before executing the insert.
184+
185+
We also use ``simulateAsyncPause`` after the insertion of the document.
186+
This provides ample time for the listener function to receive the change
187+
event and for the listener to complete its execution before
188+
closing the ``ChangeStream`` instance using the ``close()`` method.
189+
190+
.. note:: Reason to include timers
191+
192+
The timers used in this example are only for demonstration
193+
purposes. They make sure that there is enough time to register
194+
the listener and have the listener process the change event before
195+
exiting.
196+
197+
.. tabs::
198+
199+
.. tab:: JavaScript
200+
:tabid: javascript
201+
202+
.. literalinclude:: /code-snippets/usage-examples/changeStream_listener.js
203+
:language: javascript
204+
:linenos:
205+
206+
.. tab:: TypeScript
207+
:tabid: typescript
208+
209+
.. literalinclude:: /code-snippets/usage-examples/changeStream_listener.js
210+
:language: javascript
211+
:linenos:
212+
213+
.. note:: Identical Code Snippets
214+
215+
The JavaScript and TypeScript code snippets above are identical. There are no
216+
TypeScript specific features of the driver relevant to this use case.
217+
218+
Visit the following resources for additional material on the classes and
219+
methods mentioned on this page:
220+
221+
- :manual:`Change streams </changeStreams/>`
222+
- :manual:`Change events </reference/change-events/>`
223+
- :manual:`Aggregation pipeline </reference/operator/aggregation-pipeline/>`
224+
- :manual:`Aggregation stages </changeStreams/#modify-change-stream-output>`
225+
- `ChangeStream class API documentation <{+api+}/classes/ChangeStream.html>`__
226+
- `Collection.watch() <{+api+}/classes/Collection.html#watch>`__,
227+
- `Db.watch() <{+api+}/classes/Db.html#watch>`__,
228+
- `MongoClient.watch() API documentation <{+api+}/classes/MongoClient.html#watch>`__

0 commit comments

Comments
 (0)