Apache Spark - Key Value RDD

You are currently auditing this course.
11 / 11

Project - Handling Binary Files

You can load binary files from a directory as RDD using sc.binaryFiles.

Objective

We are going to load the gzipped file from HDFS and then using spark we are going to process those.

The files are located in HDFS in the directory /data/SentimentFiles/SentimentFiles/upload/data/

Taking a look

First, download a copy from hdfs using the following command:

hadoop fs -get /data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/FlumeData.1367523670393.gz

And then using unix file command, understand the datatype:

file FlumeData.1367523670393.gz

This should display something like:

FlumeData.1367523670393.gz: gzip compressed data, was "FlumeData.1367523670393", from Unix, last modified: Thu Jul 18 03:11:25 2013

First export the variables required to access Hadoop from Spark:

export HADOOP_CONF_DIR=/etc/hadoop/conf/
export YARN_CONF_DIR=/etc/hadoop/conf/

Using Scala

Start Spark Shell

Let us launch a new version of spark using the following command:

spark-shell

Copy Paste the following code

// Imports
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPOutputStream, GZIPInputStream}
import scala.collection.mutable.ArrayBuffer

//This creates the rdd from directory such that each record ia (filename, file_content)
var tweets_raw = sc.binaryFiles("/data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/")
tweets_raw.take(2)(0)._1

//This function basically converts compressed bytes to uncompressed bytes and computes the length of the decompressed content
import java.io.DataInputStream
import org.apache.spark.input.PortableDataStream

def count_length(compressed: PortableDataStream): Int = {
    var compressedis:DataInputStream = null
    try{
        var compressedis = compressed.open()

        val gzipInputStream = new GZIPInputStream(compressedis)
        var output = 0
        var totalByteCount = 0
        val bytes = new Array[Byte](1024)
        while (gzipInputStream.available() == 1) {
          val byteCount = gzipInputStream.read(bytes)
          if (byteCount > 0) {
            var lb = bytes.take(byteCount)
            output = output + lb.length
            totalByteCount += byteCount
          }
        }
        return output;
    }
    finally{
        if(compressedis != null)
            compressedis.close()
    }  
}

var decompressed_tweets_lengths = tweets_raw.mapValues(t => count_length(t))

//Save the lengths in HDFS in your home directory in a directory named jsonTweets
decompressed_tweets_lengths.saveAsTextFile("onlytweets_length_after_decompressing")

Or Using python

Start pyspark and copy-paste the following code:

import zlib
tweets_raw = sc.binaryFiles("/data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/")


def count_bits(fo):
    fname, fobj = fo
    return (fname, len(zlib.decompress(fobj, 31)))

decompressed_tweets_lengths = tweets_raw.map(count_bits)

# Save the lengths in HDFS in your home directory in a directory named jsonTweets
decompressed_tweets_lengths.saveAsTextFile("onlytweets_length_after_decompressing")
INSTRUCTIONS
  • Given the compressed file in the format of gzip in the folder /data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/ in HDFS, find the lengths of files after decompressing each.

  • The result should be saved in files in the folder onlytweets_length_after_decompressing such that lines in the file has the data in the format of tuple such as:

    (hdfs://ip-172-31-35-141.ec2.internal:8020/data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/FlumeData.1367523670393.gz,5359162)
    (hdfs://ip-172-31-35-141.ec2.internal:8020/data/SentimentFiles/SentimentFiles/upload/data/tweets_raw/FlumeData.1367523670394.gz,1899502)
    

    To check this, we can do:

    hadoop fs -cat onlytweets_length_after_decompressing/part-00000|head
    

    This gives the top 10 lines from the file onlytweets_length_after_decompressing/part-00000. We can observe that the data is in tuple format.


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...