Writing Spark Applications

8 / 16
   

Approach 1 - Using Spark Shell

Before Jumping to spark-shell lets understand what each line does.

var accessLogs = sc.textFile("/data/spark/project/access/access.log.45.gz")

This statement creates an RDD from the file located in HDFS. Please note that even though it is compressed file. Spark will uncompress it automatically. This RDD accessLogs is made up of records where each record is a line from the file.

accessLogs.take(10)

Lets check what's in the RDD by using take 10. You can see that each record of accessLogs RDDs is a line.

Now, lets create a function that checks if a line has valid IP address. This function should accept a line as argument and should return boolean value true if the line contains the IP address otherwise return false.

def containsIP(line:String):Boolean = return line matches "^([0-9\\.]+) .*$"

Lets filter our records in accessLogs RDD using this function.

var ipaccesslogs = accessLogs.filter(containsIP)

Now, let us extract only IP.

def extractIP(line:String):(String) = {
    // Here we are using the regular expression for matching the strings with a certain pattern.
    val pattern = "^([0-9\\.]+) .*$".r
    val pattern(ip:String) = line
    return (ip.toString)
}
var ips = ipaccesslogs.map(line => (extractIP(line),1));

//Count

var ipcounts = ips.reduceByKey((a,b) => (a+b))
var ipcountsOrdered = ipcounts.sortBy(f => f._2, false);
ipcountsOrdered.take(10)

As you can see, we are able to print top 10 IP Addresses. If we have to save the result to HDFS we could simply use saveAsTextFile() operation on the RDD.

The code is available in git gist here: apache_log_topnlinks.scala