Skip to content

Commit cc9bcef

Browse files
authored
(DOCSP-21746) Add Scala examples to Structured Streaming page (#122)
* (DOCSP-21746) Add Scala examples to Structured Streaming page
1 parent 24d7e0b commit cc9bcef

File tree

1 file changed

+218
-3
lines changed

1 file changed

+218
-3
lines changed

source/structured-streaming.txt

Lines changed: 218 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,59 @@ Configuring a Write Stream to MongoDB
9292
For a complete list of methods, see the
9393
`pyspark Structured Streaming reference <https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss.html>`__.
9494

95+
- id: scala
96+
content: |
97+
98+
Specify write stream configuration settings on your streaming
99+
Dataset or DataFrame using the ``writeStream`` property. You
100+
must specify the following configuration settings to write
101+
to MongoDB:
102+
103+
.. list-table::
104+
:header-rows: 1
105+
:stub-columns: 1
106+
:widths: 10 40
107+
108+
* - Setting
109+
- Description
110+
111+
* - ``writeStream.format()``
112+
- The format to use for write stream data. Use
113+
``mongodb``.
114+
115+
* - ``writeStream.option()``
116+
- Use the ``option`` method to specify your MongoDB
117+
deployment connection string with the
118+
``spark.mongodb.connection.uri`` option key.
119+
120+
You must specify a database and collection, either as
121+
part of your connection string or with additional
122+
``option`` methods using the following keys:
123+
124+
- ``spark.mongodb.database``
125+
- ``spark.mongodb.collection``
126+
127+
* - ``writeStream.outputMode()``
128+
- The output mode to use. To view a list of all supported
129+
output modes, see `the Scala outputMode documentation <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html#outputMode(outputMode:String):org.apache.spark.sql.streaming.DataStreamWriter[T]>`__.
130+
131+
The following code snippet shows how to use the preceding
132+
configuration settings to stream data to MongoDB:
133+
134+
.. code-block:: scala
135+
:copyable: true
136+
:emphasize-lines: 2-3, 6
137+
138+
<streaming Dataset/ DataFrame>.writeStream
139+
.format("mongodb")
140+
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
141+
.option("spark.mongodb.database", <database-name>)
142+
.option("spark.mongodb.collection", <collection-name>)
143+
.outputMode("append")
144+
145+
For a complete list of methods, see the
146+
`Scala Structured Streaming reference <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/index.html>`__.
147+
95148
.. _read-structured-stream:
96149
.. _continuous-processing:
97150

@@ -175,6 +228,72 @@ more about continuous processing, see the `Spark documentation <https://spark.ap
175228
For a complete list of methods, see the
176229
`pyspark Structured Streaming reference <https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss.html>`__.
177230

231+
- id: scala
232+
content: |
233+
234+
To use continuous processing with the MongoDB Spark Connector,
235+
add the ``trigger()`` method to the ``writeStream`` property
236+
of the streaming Dataset or DataFrame that you create from
237+
your MongoDB read stream. In your ``trigger()``, specify the
238+
``continuous`` parameter.
239+
240+
.. note::
241+
242+
The connector populates its read stream from your MongoDB
243+
deployment's change stream. To populate your change stream,
244+
perform update operations on your database.
245+
246+
To learn more about change streams, see
247+
:manual:`Change Streams </changeStreams>` in the MongoDB
248+
manual.
249+
250+
Specify read stream configuration settings on your local
251+
SparkSession ``readStream``. You must specify the following
252+
configuration settings to read from MongoDB:
253+
254+
.. list-table::
255+
:header-rows: 1
256+
:stub-columns: 1
257+
:widths: 10 40
258+
259+
* - Setting
260+
- Description
261+
262+
* - ``readStream.format()``
263+
- The format to use for read stream data. Use ``mongodb``.
264+
265+
* - ``writeStream.trigger()``
266+
- Enables continuous processing for your read stream.
267+
Import and use the ``Trigger.Continuous()`` property.
268+
269+
The following code snippet shows how to use the preceding
270+
configuration settings to stream data from MongoDB:
271+
272+
.. code-block:: scala
273+
:copyable: true
274+
:emphasize-lines: 1, 4, 8
275+
276+
import org.apache.spark.sql.streaming.Trigger
277+
278+
val streamingDataFrame = <local SparkSession>.readStream
279+
.format("mongodb")
280+
.load()
281+
282+
val query = streamingDataFrame.writeStream
283+
.trigger(Trigger.Continuous("1 second"))
284+
.format("memory")
285+
.outputMode("append")
286+
287+
query.start()
288+
289+
.. note::
290+
291+
Spark does not begin streaming until you call the
292+
``start()`` method on a streaming query.
293+
294+
For a complete list of methods, see the
295+
`Scala Structured Streaming reference <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/index.html>`__.
296+
178297
Examples
179298
--------
180299

@@ -227,14 +346,59 @@ Stream to MongoDB from a CSV File
227346
.option("checkpointLocation", "/tmp/pyspark/")
228347
.option("forceDeleteTempCheckpointLocation", "true")
229348
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
230-
.option('spark.mongodb.database', <database-name>)
231-
.option('spark.mongodb.collection', <collection-name>)
349+
.option("spark.mongodb.database", <database-name>)
350+
.option("spark.mongodb.collection", <collection-name>)
232351
.outputMode("append")
233352
)
234353

235354
# run the query
236355
query.start()
237356

357+
- id: scala
358+
content: |
359+
360+
To create a :ref:`write stream <write-structured-stream>` to
361+
MongoDB from a ``.csv`` file, first create a `DataStreamReader <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamReader.html>`__
362+
from the ``.csv`` file, then use that ``DataStreamReader`` to
363+
create a `DataStreamWriter <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html>`__
364+
to MongoDB. Finally, use the ``start()`` method to begin the
365+
stream.
366+
367+
As streaming data is read from the ``.csv`` file, it is added
368+
to MongoDB in the `outputMode <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html#outputMode(outputMode:String):org.apache.spark.sql.streaming.DataStreamWriter[T]>`__
369+
you specify.
370+
371+
.. code-block:: scala
372+
:copyable: true
373+
:emphasize-lines: 10, 16
374+
375+
// create a local SparkSession
376+
val spark = SparkSession
377+
.builder
378+
.appName("writeExample")
379+
.master("spark://spark-master:<port>")
380+
.config("spark.jars", "<mongo-spark-connector>.jar")
381+
.getOrCreate()
382+
383+
// define a streaming query
384+
val query = spark.readStream
385+
.format("csv")
386+
.option("header", "true")
387+
.schema(<csv-schema>)
388+
.load(<csv-file-name>)
389+
// manipulate your streaming data
390+
.writeStream
391+
.format("mongodb")
392+
.option("checkpointLocation", "/tmp/")
393+
.option("forceDeleteTempCheckpointLocation", "true")
394+
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
395+
.option("spark.mongodb.database", <database-name>)
396+
.option("spark.mongodb.collection", <collection-name>)
397+
.outputMode("append")
398+
399+
// run the query
400+
query.start()
401+
238402
Stream to a CSV File from MongoDB
239403
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
240404

@@ -294,4 +458,55 @@ Stream to a CSV File from MongoDB
294458
)
295459

296460
# run the query
297-
query.start()
461+
query.start()
462+
463+
- id: scala
464+
content: |
465+
466+
To create a :ref:`read stream <read-structured-stream>` to a
467+
``.csv`` file from MongoDB, first create a `DataStreamReader <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamReader.htmll>`__
468+
from MongoDB, then use that ``DataStreamReader`` to
469+
create a `DataStreamWriter <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html>`__
470+
to a new ``.csv`` file. Finally, use the ``start()`` method
471+
to begin the stream.
472+
473+
As new data is inserted into MongoDB, MongoDB streams that
474+
data out to a ``.csv`` file in the `outputMode <https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html#outputMode(outputMode:String):org.apache.spark.sql.streaming.DataStreamWriter[T]>`__
475+
you specify.
476+
477+
.. code-block:: scala
478+
:copyable: true
479+
:emphasize-lines: 17, 25, 28
480+
481+
// create a local SparkSession
482+
val spark = SparkSession
483+
.builder
484+
.appName("readExample")
485+
.master("spark://spark-master:<port>")
486+
.config("spark.jars", "<mongo-spark-connector>.jar")
487+
.getOrCreate()
488+
489+
// define the schema of the source collection
490+
val readSchema = StructType()
491+
.add("company_symbol", StringType())
492+
.add("company_name", StringType())
493+
.add("price", DoubleType())
494+
.add("tx_time", TimestampType())
495+
496+
// define a streaming query
497+
val query = spark.readStream
498+
.format("mongodb")
499+
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
500+
.option("spark.mongodb.database", <database-name>)
501+
.option("spark.mongodb.collection", <collection-name>)
502+
.schema(readSchema)
503+
.load()
504+
// manipulate your streaming data
505+
.writeStream
506+
.format("csv")
507+
.option("path", "/output/")
508+
.trigger(Trigger.Continuous("1 second"))
509+
.outputMode("append")
510+
511+
// run the query
512+
query.start()

0 commit comments

Comments
 (0)