Adv Spark Programming

5 / 52

Adv Spark Programming - Persistence StorageLevel

There are various ways to persist an RDD - You can persist it in memory or in the disk. Here persist in memory sounds very strange - it is essentially caching the RDD so that we do not have to compute the entire RDD graph on every action.

To make it very customization, persist accepts an instance of StorageLevel class as argument. This storagelevel class is a bunch of settings which are going talk about.

You can create your instance of storagelevel using the constructor as shown.

The first argument is a boolean variable that specifies whether to use disk or not.

The second argument represents if we want to use memory. In the example we want to use both disk and memory, therefore, both are true.

The third argument is whether to use off the heap memory. The objects stored in heap are fast to retrieve but are subject to garbage collected by Java garbage collector.

The fourth argument represents whether to persist deserialized. By setting it false, it will store RDD as serialized Java objects - one byte array per partition. This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

The last argument is for specifying how many copies do we want. For critical RDDs we can have multiple replicas of RDD partitions in order avoid re-computation on failure of machines. The default value for number of replicas is 1 and it can go upto 40.

Finally, while using persist we can pass the storagelevel object as argument.

There are precrafted shortcuts for storage levels for various use cases which you can quickly use as per your requirements without creating the storagelevel objects.

These shortcuts such as the one shown in the slide are nothing but a static object of storagelevel. Lets understand various pre-created storagelevels and when to use each.

It persists the rdd into the memory so that it doesn’t get computed on every action in an application.

Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

Stores RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions on disk that don't fit in memory, and read them from there when they're needed.

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk, instead of recomputing them on the fly each time they're needed.

Store the RDD partitions only on disk.

Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental)

Store RDD in a serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.

Store the RDD partitions only on disk.

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

If your RDDs fit comfortably with the default storage level which is MEMORY_ONLY, leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

Slides - Adv Spark Programming (1)


No hints are availble for this assesment

Answer is not availble for this assesment

Please login to comment

13 Comments

Hi Team,

While creating StorageLevel object, we have fourth argument deserialized. By setting it to false, Java objects are serialized.

In case of PySpark, are Python objects serialized? 

Regards
Sairam Srinivas. V

  Upvote    Share

Hi,

PySpark supports custom serializers for transferring data.

Thanks.

  Upvote    Share

Hi Team, How would we know whether my dataset fit into MEMORY_ONLY or not as this is the default persistant option. As you said if we use default option then it will persist data in memory only but again as you said if the data doesn't fit into memory then that part of data will be recompiled. So here question is how would we know if the data fit into memory or not.. Can you please give one example how to check the size of the dataset that we are going to persist? Because if we know this then only we can decide which storage level to be used right.

 1  Upvote    Share

> So here question is how would we know if the data fit into memory or not.. 

Good question. There is no clear way of figuring this out. Probably by looking at error messages.

A better way would be by based on knowing what size of the dataset are you going to load and the persist in memory.

  Upvote    Share

Thanks for the reply. Understood.

  Upvote    Share

Whatif replication factor need to be more than 2? Can it specified in MEMORY_ONLY_2, MEMORY_AND_DISK_2?

  Upvote    Share

In that case, you can create an object of StorageLevel and use that instead of using the constants.

  Upvote    Share

1. Why the partition is stored by default as serialised object for StorageLevel.DISK_ONLY whearas it is stored as deseralised for StorageLevel.MEMORY_AND_DISK?

2. The default replication value is 1. Does it mean only 1 copy of RDD is created or 2 copies (one original and 2nd for replication)? If it is former one then there is no replication since there will be error/no output if that partcular node goes down.

3. In StorageLevel.OFF_HEAP is mentioned that "Store RDD in serialized format in Tachyon.". Does Spark have in built APIs for Tachyon? Can any other in memory databases be used with Spark?

4. In above text "TODO::??" may need to be replaced by following:
OFF_HEAP (experimental)

Store RDD in a serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.

  Upvote    Share

> Why the partition is stored by default as serialised object for StorageLevel.DISK_ONLY whearas it is stored as deseralised for StorageLevel.MEMORY_AND_DISK?

It is a design decision from Spark's side. Here are my thoughts on it. To keep the data from memory into disk either we could convert it into a serialized format or deserialized or raw. Serialized Format means the data is such that it can be loaded on another machine or at a later point in time. Deserialized format is basically a kind of paging or object format that is fast to save and load but not possible to transfer and use later because it might have the memory addresses mapping.  The deserialized format is slower to persist.

So, when you explicitly signal the spark that you want DISK_ONLY, it thinks that you are looking for later use of persisted data not the performance. 

> The default replication value is 1. Does it mean only 1 copy of RDD is created or 2 copies (one original and 2nd for replication)? If it is the former one then there is no replication since there will be an error/no output if that particular node goes down.

By default, it is only 1 copy.  If a node doing computation on a partition goes down in the middle of the task, the RDD partition will be recomputed from the begining. There is no data loss per se. It is only the extra latency. Since most of these tasks are usually batch processes, it is okay to let it take more time. I think this is the thought process for the design.

> In StorageLevel.OFF_HEAP is mentioned that "Store RDD in serialized format in Tachyon.". Does Spark have in built APIs for Tachyon?

It did.

> Can any other in-memory databases be used with Spark?

Yes, there is an example of MongoDB integration in the course.

> 4. In above text "TODO::??" may need to be replaced by following:

Thank you.

 

 

  Upvote    Share

please explain deserialized Java object and serialized Java objects

  Upvote    Share

Hi, Amit. 

Serialization is the process of saving an object's state to a sequence of bytes, which then can be stored on a file or sent over the network, and deserialization is the process of reconstructing an object from those bytes. 

When we do the serialization of these objects using Java methds "java.io.Serializable" the it is called a java serialized object and vice versa.

Kindly refer to this doc for more information.

https://docs.oracle.com/javase/tutorial/jndi/objects/serial.

All the best!

 2  Upvote    Share

Hello Sir,
Not clear for 2 points ,
1)deserialized Java objects
2) whats is serialized Java objects (one byte array per partition) ?

can you provide more information or reference ?

  Upvote    Share

Sir what if we need the replication value as 1 without mentioning it in the code,how would the code look like ??

  Upvote    Share