An example of docker-compose to set up a single Apache Spark node connecting to MongoDB via MongoDB Spark Connector
** For demo purposes only **
You can start by running command :
docker-compose run spark bash
Which would run the spark node and the mongodb node, and provides you with bash shell for the spark.
From the spark instance, you could reach the MongoDB instance using mongodb
hostname.
You can find a small dataset example in /home/ubuntu/times.json
which you can load using initDocuments.scala :
spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION} -i ./initDocuments.scala
For example, please see examples.scala to query from mongodb, run a simple aggregation, dataframe SQL and finally write output back to mongodb. This file will also be available inside of the spark container in /home/ubuntu/examples.scala
Run the spark shell
by executing:
spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION}
You can also append -i <file.scala>
to execute a scala file via the spark shell. For example:
spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION} -i ./examples.scala
For code block in examples.scalaL14-25, this is an example of grouping. For example if you have 4 documents of :
{ "doc": "A", "timestamp" : ISODate("2016-02-15T00:43:04.686Z"), "myid" : 1 }
{ "doc": "B", "timestamp" : ISODate("2016-02-15T00:43:06.310Z"), "myid" : 2 }
{ "doc": "C", "timestamp" : ISODate("2016-01-03T00:43:07.534Z"), "myid" : 1 }
{ "doc": "D", "timestamp" : ISODate("2016-01-03T00:43:09.214Z"), "myid" : 2 }
The code block will group by myid
and sort by latest timestamp, which would return only two documents, doc:A
and doc:B
. The grouping removes duplicate of myid
s by returning only documents with the latest timestamp.
See related article: