Adv Spark Programming

1 / 52

Adv Spark Programming - Understanding Persistence

Let us understand how we can improve the performance of our Spark applications using persistence or caching.

Recollect that RDDs are lazily evaluated.

Whenever an action is executed on an RDD, all of the RDDs on which it depends upon are also evaluated. Which means if an action on an RDD in this graph is being called twice, the same computation could happen twice.

Also, We may wish to use the same RDD multiple times.

So, to avoid re-computation of an RDD, we can cache or persist it using the function provided on RDD.

If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed.

We can also replicate our data on multiple nodes if we want to be able to handle node failure without slowdown.

Let’s take a look the following code in order to understand the caching.

Create an RDD of number from one to hundred thousand with fifty partitions.

Now, say we are creating an another RDD partitions by summing up the values of each of the partition of nums RDD. We have first created a function mysum which sums up values of an iterator and returns the sum as a single value iterator. This function mysum is being called using mapPartitions on nums.

Now, say we have created another RDD partition1 from partitions RDD by increasing each value by one using a map.

Everytime, we are going to call an action on paritions1 RDD partitions RDD is also going to be called. So, if partitions1 is going to be frequently used, it will be inefficient.

In such circumstances, we can cache partitions1 RDD.

We can persist an RDD using a method persist. This method needs an instance of StorageLevel as argument. The storage level specifies how should the RDD be persisted - in memory or on disk for example. If we do not provide any argument, it saves in memory the memory only.

Now, let’s try to persist in memory and disk both but before re-persisting we need to unpersist. For that we can use unpersist method.

Now, let us import a class called StorageLevel. Then call method persist() with StorageLevel MEMORY AND DISK. If the RDD doesn’t fit into RAM it would spill over to the disk.

Let us check using getStorageLevel. And you can see that it is same as StorageLevel.MEMORY_AND_DISK

Slides - Adv Spark Programming


No hints are availble for this assesment

Answer is not availble for this assesment

Please login to comment

17 Comments

Please correct the slide.

Yellow highlighted is extra character printed in the slide (a. return x+1)

  Upvote    Share

Hi,

Thank you for pointing this out, actually it has been given to designate that it is a separate line. However, we will definitely keep this in mind while we are updating our courseware.

Thanks.

  Upvote    Share

1. The output of following command differs In the ppt/video and the actual output.

partitions1.getStorageLevel /*Output in the video*/
res27: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)

partitions1.getStorageLevel /*Output which I received*/

res2: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)

2. Excerpt from above "The storage level specifies how should the RDD be persisted - in memory or on disk for example. If we do not provide any argument, it saves in memory the memory only."
"If the RDD doesn’t fit into RAM it would spill over to the disk."

What happens If the argument is NOT provided for persist() and the RDD doesn't fit into memory/RAM?

3. I guess the following command provides storage level of the partition.

partitions1.persist(StorageLevel.MEMORY_AND_DISK)
partitions1.getStorageLevel

What is the purpose of following command and which storage level it returns?

StorageLevel.MEMORY_AND_DISK

  Upvote    Share
  • When executed the above program without any partitions,it by default created 16 partitions.So is 16 the default value for partitions,if its not specified
  Upvote    Share

Hi, Ajinkya. 

You are correct there are 16 partitions being created.  It should have been specified in the configurations file. 

Kindly refere this!
https://discuss.cloudxlab.com/t/error-quota-exceeded-while-running-spark-job-but-havent-used-much-of-disk-space-solved/3472/3

All the best!

  Upvote    Share

In the persistence example the value of partition is 50. On the Cloudxlab there are 16 processors on the machine and we are executing this example in local mode so how 50 partitions are being created? Is it spanning over other machines or appx. 3 partitions are created on each processor?

  Upvote    Share

> ... this example in local mode so how 50 partitions are being created?

Yes, in this example. We are talking hypothetically to create rdd with 50 partitions.

> Is it spanning over other machines or appx. 3 partitions are created on each processor?

It all depends on resource availability. It is something that cluster managers automatically decides.

  Upvote    Share
  • By Default,how many partitions are there in spark if we dont specify the partitions.Secondly what would be the difference  in executing a program with partion and without partition in Spark
  Upvote    Share

So, to avoid re-computation of an RDD, we can cache or persist it using the function provided on RDD.

please explain this and this persistence is stll not clear

  Upvote    Share

HI, Amit. '

When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them. So recomputations of the RDD is not necessary.

You can mark an RDD to be persisted using the persist() or cache() methods on it. 

Kindly refer the below doc for more information.

https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

All the best!

  Upvote    Share

Hi Satyajit,

Lets take an example. Say I have a object "Account" consisting "Account No, Name and Balance", which is being used frequently so a persisted RDD created for it. The object is created from "Account" table in some database or HDFS file. Now say a new account is added to the account table/file, how it will be reflected in the persisted RDD?

If the new record cannot be reflected then is it correct to presume that the performance gained due to persistency is at the cost of consistency?

  Upvote    Share

The design of MapReduce or Spark is for batch processing humongous data. Therefore it is great for processing big but relatively static data.

So, usually we are okay to miss the most recent changes.

If you want to process the most recent data, it would be better to design the stream processing pipeline as described in one of the chapters.

 

  Upvote    Share

In the above example in the vedio why we are caching partition1 rdd instead of partition rdd because partition rdd will be called whenever we call the partition1 rdd ,
so why we are caching partition1 rdd?

please explain!!

Also please tell me the arguments which are to be given when persisting an RDD?

  Upvote    Share

In the above slide, if a variable 'nums' is created with 50 partitions then how will data be stored in the RDD ?

  Upvote    Share

It will try to divide the records equally in 50 partitions.

  Upvote    Share

Can you please explain me the concept of partitioning in a RDD ? If we create 50 partitions as explained above will the data be stored in 50 data nodes ?

  Upvote    Share

RDDs are made of partitions. each partition contains records. When the program such as via map or flatmap runs, it will be able to run programs parallely for each partitions.

  Upvote    Share