Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left
Apply NowLogin using Social Account
     Continue with GoogleLogin using your credentials
Before Jumping to spark-shell lets understand what each line does.
var accessLogs = sc.textFile("/data/spark/project/access/access.log.2.gz")
This statement creates an RDD from the file located in HDFS. Please note that even though it is compressed file. Spark will uncompress it automatically. This RDD accessLogs is made up of records where each record is a line from the file.
accessLogs.take(10)
Lets check what's in the RDD by using take 10. You can see that each record of accessLogs RDDs is a line.
Now, lets create a function that checks if a line has valid IP address. This function should accept a line as argument and should return boolean value true if the line contains the IP address otherwise return false.
def containsIP(line:String):Boolean = return line matches "^([0-9\\.]+) .*$"
Lets filter our records in accessLogs RDD using this function.
var ipaccesslogs = accessLogs.filter(containsIP)
Now, let us extract only IP.
def extractIP(line:String):(String) = {
// Here we are using the regular expression for matching the strings with a certain pattern.
val pattern = "^([0-9\\.]+) .*$".r
val pattern(ip:String) = line
return (ip.toString)
}
var ips = ipaccesslogs.map(line => (extractIP(line),1));
//Count
var ipcounts = ips.reduceByKey((a,b) => (a+b))
var ipcountsOrdered = ipcounts.sortBy(f => f._2, false);
ipcountsOrdered.take(10)
As you can see, we are able to print top 10 IP Addresses. If we have to save the result to HDFS we could simply use saveAsTextFile() operation on the RDD.
The code is available in git gist here: apache_log_topnlinks.scala
The same code is available for pyspark
below:
import re
accessLogs = sc.textFile("/data/spark/project/access/access.log.2.gz")
accessLogs.take(10)
PAT = r"^([0-9\\.]+) .*$"
def containsIP(line):
if re.match(PAT, line):
return True
return False
# Test if it is working
containsIP(accessLogs.take(10)[3])
ipaccesslogs = accessLogs.filter(containsIP)
def extractIP(line):
# Here we are using the regular expression for matching the strings with a certain pattern.
m = re.match(PAT, line)
ip = m.group(1)
return ip
# Check if it works
extractIP(accessLogs.take(10)[3])
ips = ipaccesslogs.map(lambda line: (extractIP(line),1));
# Check if it works
ips.take(10)
ipcounts = ips.reduceByKey(lambda a,b: a+b)
# Check if it works
ipcounts.take(10)
# Now order the data by counts
ipcountsOrdered = ipcounts.sortBy(lambda f: f[1], False);
ipcountsOrdered.take(10)
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
Loading comments...