Login using Social Account
     Continue with GoogleLogin using your credentials
As part of this hands-on, we will be learning how to read and write data in MongoDB using Apache spark via the spark-shell which is in Scala.
Please note that we are using the data that has been downloaded from here:
We have made this data available at /cxldata/findata/mstf.csv in CloudxLab
Copy the data into your home directory using the following Linux command on the console.
cp /cxldata/findata/mstf.csv ~/
Import the data into the mongodb
This will create a collection minibars
in marketdata
database in MongoDB
mongoimport mstf.csv --type csv --headerline -d marketdata -c minibars
Start mongo shell
mongo
On the mongo shell run the following command to check if the data is imported or now.
Select the database
use marketdata
Just check one document in the collection
db.minibars.findOne()
It should print something like this:
{
"_id" : ObjectId("600531b34b493e54d28e9987"),
"Symbol" : "MSFT",
"Timestamp" : "2009-08-24 09:32",
"Day" : 24,
"Open" : 24.29,
"High" : 24.33,
"Low" : 24.29,
"Close" : 24.31,
"Volume" : 230729
}
If you are getting an error "not master and slaveOk=false" like the following, please run rs.slaveOk()
2021-06-26T18:43:00.898+0000 E QUERY [thread1] Error: error: {
"ok" : 0,
"errmsg" : "not master and slaveOk=false",
"code" : 13435,
"codeName" : "NotMasterNoSlaveOk"
} :
Now open a web console in a new tab and launch the spark-shell with the mongo-hadoop package. You can learn more about mongo-hadoop here: https://github.com/mongodb/mongo-hadoop .To learn about the Spark usage check this: https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
spark-shell --packages 'org.mongodb.mongo-hadoop:mongo-hadoop-core:2.0.2'
Wait for the spark-shell prompt scala>
On the prompt run the following commands:
// Import the objects needed
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.bson.BSONObject
import com.mongodb.hadoop.{
MongoInputFormat, MongoOutputFormat,
BSONFileInputFormat, BSONFileOutputFormat}
import com.mongodb.hadoop.io.MongoUpdateWritable
// Initialize Mongo Config
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://localhost:27017/marketdata.minibars")
val documents = sc.newAPIHadoopRDD(
mongoConfig, // Configuration
classOf[MongoInputFormat], // InputFormat
classOf[Object], // Key type
classOf[BSONObject]) // Value type
//To print the documents
documents.take(10)
So you can see that you were able to read the documents from MongoDB using Apache Spark. Now let us try to save data in MongoDB. The following code would try to save the data in MongoDB database marketdata
in collection outcoln
.
NOTE: In case the collection is already existing, it will throw the error. Either delete the collection from MongoDB [use marketdata; db.outcoln.drop()
] or use a different name such as marketdata.outcoln1
instead of marketdata.outcoln
.
val outputConfig = new Configuration()
outputConfig.set("mongo.output.uri", "mongodb://localhost:27017/marketdata.outcoln")
documents.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
classOf[Object],
classOf[BSONObject],
classOf[MongoOutputFormat[Object, BSONObject]],
outputConfig)
// Exit The scala shell by
:quit
Now, let's check if the documents were really written to MongoDB. Launch the MongoDB shell again:
mongo
On the MongoDB shell, run the following commands
// Change database
use marketdata
//Check if there are any records in our collection
db.outcoln.findOne()
It should print something like
{
"_id" : ObjectId("600531b34b493e54d28e9987"),
"Symbol" : "MSFT",
"Timestamp" : "2009-08-24 09:32",
"Day" : 24,
"Open" : 24.29,
"High" : 24.33,
"Low" : 24.29,
"Close" : 24.31,
"Volume" : 230729
}
This completes the exercise of reading / writing the data from the MongoDB using Spark and Scala.
References: https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Loading comments...