Project - Writing Spark Applications

You are currently auditing this course.
8 / 16

Approach 1 - Using Spark Shell




Not able to play video? Try with youtube

Before Jumping to spark-shell lets understand what each line does.

var accessLogs = sc.textFile("/data/spark/project/access/access.log.2.gz")

This statement creates an RDD from the file located in HDFS. Please note that even though it is compressed file. Spark will uncompress it automatically. This RDD accessLogs is made up of records where each record is a line from the file.

accessLogs.take(10)

Lets check what's in the RDD by using take 10. You can see that each record of accessLogs RDDs is a line.

Now, lets create a function that checks if a line has valid IP address. This function should accept a line as argument and should return boolean value true if the line contains the IP address otherwise return false.

def containsIP(line:String):Boolean = return line matches "^([0-9\\.]+) .*$"

Lets filter our records in accessLogs RDD using this function.

var ipaccesslogs = accessLogs.filter(containsIP)

Now, let us extract only IP.

def extractIP(line:String):(String) = {
    // Here we are using the regular expression for matching the strings with a certain pattern.
    val pattern = "^([0-9\\.]+) .*$".r
    val pattern(ip:String) = line
    return (ip.toString)
}
var ips = ipaccesslogs.map(line => (extractIP(line),1));

//Count

var ipcounts = ips.reduceByKey((a,b) => (a+b))
var ipcountsOrdered = ipcounts.sortBy(f => f._2, false);
ipcountsOrdered.take(10)

As you can see, we are able to print top 10 IP Addresses. If we have to save the result to HDFS we could simply use saveAsTextFile() operation on the RDD.

The code is available in git gist here: apache_log_topnlinks.scala

Using Python

The same code is available for pyspark below:

import re

accessLogs = sc.textFile("/data/spark/project/access/access.log.2.gz")
accessLogs.take(10)

PAT = r"^([0-9\\.]+) .*$"
def containsIP(line):
    if re.match(PAT, line):
        return True
    return False

# Test if it is working
containsIP(accessLogs.take(10)[3])

ipaccesslogs = accessLogs.filter(containsIP)

def extractIP(line):
    # Here we are using the regular expression for matching the strings with a certain pattern.
    m = re.match(PAT, line)
    ip = m.group(1)
    return ip

# Check if it works
extractIP(accessLogs.take(10)[3])

ips = ipaccesslogs.map(lambda line: (extractIP(line),1));

# Check if it works
ips.take(10)

ipcounts = ips.reduceByKey(lambda a,b: a+b)

# Check if it works
ipcounts.take(10)

# Now order the data by counts
ipcountsOrdered = ipcounts.sortBy(lambda f: f[1], False);
ipcountsOrdered.take(10)

Loading comments...