Dussehra Offer - Flat 20% Off on All Courses | Offer Ending In :

Apache Spark - Key Value RDD

29 / 29

Project - Handling Binary Files

You can load binary files from a directory as RDD using sc.binaryFiles.

Objective

We are going to load the gzipped file from HDFS and then using spark we are going to process those.

The files are located in HDFS in the directory /data/SentimentFiles/SentimentFiles/upload/data/

Taking a look

First, download a copy from hdfs using the following command:

hadoop fs -get /data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/FlumeData.1367523670393.gz

And then using unix file command, understand the datatype:

file FlumeData.1367523670393.gz

This should display something like:

FlumeData.1367523670393.gz: gzip compressed data, was "FlumeData.1367523670393", from Unix, last modified: Thu Jul 18 03:11:25 2013

Start spark shell

First export the variables required to access Hadoop from Spark:

export HADOOP_CONF_DIR=/etc/hadoop/conf/
export YARN_CONF_DIR=/etc/hadoop/conf/

Start Spark Shell

Let us launch a new version of spark using the following command:

/usr/spark2.2.1/bin/spark-shell

Copy Paste the following code

// Imports
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import scala.collection.mutable.ArrayBuffer

//This creates the rdd from directory such that each record ia (filename, file_content)
var tweets_raw = sc.binaryFiles("/data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/")
tweets_raw.take(2)(0)._1

//This function basically converts compressed bytes to uncompressed bytes
def decompress(compressed: Array[Byte]): Array[Byte] = {
    val gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
    val output = new ArrayBuffer[Byte]()
    var totalByteCount = 0
    val bytes = new Array[Byte](1024)
    while (gzipInputStream.available() == 1) {
      val byteCount = gzipInputStream.read(bytes)
      if (byteCount > 0) {
        output ++= bytes.take(byteCount)
        totalByteCount += byteCount
      }
    }
    output.take(totalByteCount).toArray
  }
var decompressed_tweets = tweets_raw.map(t => decompress(t._2.toArray))

//Converts the bytes to UTF8 text
def bytes_toArray(bytes: Array[Byte]): String = {
    new String(bytes, "UTF-8") 
}
// jsonTweets is an rdd of tweets
var jsonTweets = decompressed_tweets.map(bytes_toArray)

//Keep only the files having a particular string
var result = jsonTweets.filter(_.contains("330043883738234880"))

//Save the result in HDFS in your home directory in a directory named jsonTweets
result.saveAsTextFile("jsonTweets")