Loading and Saving Data

4 / 8

Interacting with MongoDB using Scala in Apache Spark

As part of this hands-on, we will be learning how to read and write data in MongoDB using Apache spark via the spark-shell which is in Scala.

Please note that we are using the data that has been downloaded from here:

  • http://www.barchartmarketdata.com/data-samples/mstf.csv
  • http://www.barchartmarketdata.com/sample-data-feeds

We have made this data available at /cxldata/findata/mstf.csv in CloudxLab

Copy the data into your home directory using the following Linux command on the console.

cp /cxldata/findata/mstf.csv ~/

Import the data into the mongodb This will create a collection minibars in marketdata database in MongoDB

mongoimport mstf.csv --type csv --headerline -d marketdata -c minibars

Start mongo shell

mongo

On the mongo shell run the following command to check if the data is imported or now.

Select the database

use marketdata

Just check one document in the collection

db.minibars.findOne()

It should print something like this:

{
    "_id" : ObjectId("600531b34b493e54d28e9987"),
    "Symbol" : "MSFT",
    "Timestamp" : "2009-08-24 09:32",
    "Day" : 24,
    "Open" : 24.29,
    "High" : 24.33,
    "Low" : 24.29,
    "Close" : 24.31,
    "Volume" : 230729
}

If you are getting an error "not master and slaveOk=false" like the following, please run rs.slaveOk()

2021-06-26T18:43:00.898+0000 E QUERY    [thread1] Error: error: {
    "ok" : 0,
    "errmsg" : "not master and slaveOk=false",
    "code" : 13435,
    "codeName" : "NotMasterNoSlaveOk"
} :

Now open a web console in a new tab and launch the spark-shell with the mongo-hadoop package. You can learn more about mongo-hadoop here: https://github.com/mongodb/mongo-hadoop .To learn about the Spark usage check this: https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage

spark-shell --packages 'org.mongodb.mongo-hadoop:mongo-hadoop-core:2.0.2'

Wait for the spark-shell prompt scala>

On the prompt run the following commands:

// Import the objects needed
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.bson.BSONObject
import com.mongodb.hadoop.{
  MongoInputFormat, MongoOutputFormat,
  BSONFileInputFormat, BSONFileOutputFormat}
import com.mongodb.hadoop.io.MongoUpdateWritable

// Initialize Mongo Config
val mongoConfig = new Configuration()

mongoConfig.set("mongo.input.uri", "mongodb://localhost:27017/marketdata.minibars")

val documents = sc.newAPIHadoopRDD(
    mongoConfig,                // Configuration
    classOf[MongoInputFormat],  // InputFormat
    classOf[Object],            // Key type
    classOf[BSONObject])        // Value type

//To print the documents 

documents.take(10)

So you can see that you were able to read the documents from MongoDB using Apache Spark. Now let us try to save data in MongoDB. The following code would try to save the data in MongoDB database marketdata in collection outcoln.

NOTE: In case the collection is already existing, it will throw the error. Either delete the collection from MongoDB [use marketdata; db.outcoln.drop()] or use a different name such as marketdata.outcoln1 instead of marketdata.outcoln.

val outputConfig = new Configuration()
outputConfig.set("mongo.output.uri", "mongodb://localhost:27017/marketdata.outcoln")

documents.saveAsNewAPIHadoopFile(
    "file:///this-is-completely-unused",
    classOf[Object],
    classOf[BSONObject],
    classOf[MongoOutputFormat[Object, BSONObject]],
    outputConfig)

// Exit The scala shell by
:quit

Now, let's check if the documents were really written to MongoDB. Launch the MongoDB shell again:

mongo

On the MongoDB shell, run the following commands

// Change database
use marketdata

//Check if there are any records in our collection

db.outcoln.findOne()

It should print something like

{
    "_id" : ObjectId("600531b34b493e54d28e9987"),
    "Symbol" : "MSFT",
    "Timestamp" : "2009-08-24 09:32",
    "Day" : 24,
    "Open" : 24.29,
    "High" : 24.33,
    "Low" : 24.29,
    "Close" : 24.31,
    "Volume" : 230729
}

This completes the exercise of reading / writing the data from the MongoDB using Spark and Scala.

References: https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...