Adv Spark Programming

12 / 52

Adv Spark Programming - Data Partitioning Example

Video is also available at alternate location here.

Data Partitioning Example:

Let’s take an example of an online magazine. We have users who read any kind of articles on the website and they also have a preferred subscriptions based on topics.

Our objective is to count how many users visited a link that was not one of their subscribed topics in almost realtime.

Lets try to put down kind of data we have.

We have a large table UserData which is loaded in form of an RDD containing pairs having two fields User Identifier and the topics that user has subscribed to. So, UserData represents the users’ subscriptions.

Also, we are keeping record of all the activities of a user such as who has clicked on which topic link. We call these events. And this rdd of events contain userid and linkinfo pairs. It is smaller table because it has only last five minutes of user activity.

How will we accomplish this?

We will have to join userData with events on user id periodically. While joining two rdds it is essentially a lookup based on user id.

Though the code will run fine but it will be inefficient because it would involve a lot of network transfer.

Let’s take a look at the code.

First, we are loading userData once from a file located in HDFS.

Then we are defining a function processNewLogs which accepts logFileName as argument. This argument is a file which contains the last five minutes of user activity. This function processNewLogs will be called periodically to process new log file.

Inside this function, we will load the file that is passed as argument as an RDD. Please note that both userData and events are loaded from sequence files.

Afterwards, we are joining the userData with events to form an RDD with the name joined.

We need to filter this joined RDD in order to keep only the users which have visited the topics that they have not subscribed to. Let’s take a look at the code.

First, we are loading userData once from a file located in HDFS. Then we are defining a function processNewLogs which accepts logFileName as argument. This argument is a file which contains the last five minutes of user activity. This function processNewLogs will be called periodically to process new log file. Inside this function, we will load the file that is passed as argument as an RDD. Please note that both userData and events are loaded from sequence files. Afterwards, we are joining the userData with events to form an RDD with the name joined.

We need to filter this joined RDD in order to keep only the users which have visited the topics that they have not subscribed to. Then we print the result. Though it gives correct results, it would be inefficient.

To avoid too much shuffling, we can partition our userData based on user ID. And to make it even faster to look up we can persist it.

As you can guess from the diagram that the network transfer required to lookup userData has become less. Thus, joining will be become really fast.

This is very much similar to creating indexes on database table. Just that these RDD are temporary unlike tables.

So, let’s take a look at the code.

First we importing a partitioner implementation HashPartitioner.

Then, we are simply using a transformation partitionBy with an instance of HashPartitioner. The argument to HashPartitioner which 8 is basically number of partitions that you expect in output.

Partitionby method is applicable only on the PairRDD.

Partitioners are classes which specify the criteria to reorganize the keys of keyvalue RDD or PairRDD.

The examples of partitioners are HashPartitioner and RangePartitioner

HashPartitioner is a very simple class.

It that implements hash-based partitioning using Java's Object.hashCode method. It partitions sortable records by range into roughly equal ranges. The ranges are determined by sampling the content of the RDD passed in. Note that the actual number of partitions created by the RangePartitioner might not be the same as the partitions parameter, in the case where the number of sampled records is less than the value of partitions.

You can repartition an RDD at any point of time using repartition method. This method accepts an integer argument specifying new number of partitions.

Most of the operations that generally involving a lookup benefit from partitioning. Meaning such operations become faster if the RDD is partitioned based on the same key on which we are looking up. Example of such operations are cogroup, groupWith, groupByKey, reduceByKey, join, leftOuterJoin, rightOuter Join, combineByKey, and lookup.