Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left

  Apply Now

Advanced Spark Programming with Python

1 / 49

Adv Spark Programming - Understanding Persistence (Python)

Let us understand how we can improve the performance of our Spark applications using persistence or cache.

Recollect that RDDs are lazily evaluated. Whenever an action is executed on an RDD, all of the RDDs upon which it depends are also evaluated. This means if an action on an RDD in a graph is being called twice, the same computation could happen twice.

Also, we may wish to use the same RDD multiple times. So, to avoid the re-computation of an RDD, we can cache or persist it using the function provided on RDD.

If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed. We can also replicate our data on multiple nodes if we want to be able to handle node failure without slowdown.

Let’s take a look at the following code in order to understand the caching. Create an RDD of numbers from one to a hundred thousand with fifty partitions.

nums = sc.parallelize(range(100000), 50)

Now, say we are creating another RDD partitions by summing up the values of each of the partitions of nums RDD. We have first created a function mysum which sums up the values of an iterator and returns the sum as a single value list. This function mysum is being called using mapPartitions on nums.

def mysum(itr):
    return [sum(itr)]

partitions = nums.mapPartitions(mysum)

Now, say we have created another RDD partition1 from partitions RDD by increasing each value by one using a map.

def incrByOne(x):
    return x+1;

partitions1 = partitions.map(incrByOne)

Every time, we are going to call an action on paritions1 RDD partitions RDD is also going to be called. So, if partitions1 is going to be frequently used, it will be inefficient. In such circumstances, we can cache partitions1 RDD.

partitions1.persist()

We can persist an RDD using the method persist. This method needs an instance of StorageLevel as an argument. The storage level specifies how should the RDD be persisted - in memory or on disk for example. If we do not provide any argument, it saves in memory the memory only.

partitions1.getStorageLevel()

Now, let’s try to persist in memory and disk both but before re-persisting, we need to unpersist. For that, we can use unpersist method.

partitions1.unpersist()

First import a class called StorageLevel. Then call method persist() with StorageLevel MEMORY AND DISK. If the RDD doesn’t fit into RAM it would spill over to the disk.

from pyspark import StorageLevel

//This persists our RDD into memrory and disk
partitions1.persist(StorageLevel.MEMORY_AND_DISK)

Let us check using getStorageLevel(). And you can see that it is the same as StorageLevel.MEMORY_AND_DISK

enter image description here

Scala Version of Video on Persistence

Slides - Adv Spark Programming


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...