DataFrames, Spark SQL, R

9 / 18

Spark SQL - Infer Schema Using Reflection

Code

Scala

val spark2 = spark
import spark2.implicits._

case class Person(name: String, age: Long)

val txtRDD = sc.textFile("/data/spark/people.txt")
val arrayRDD = txtRDD.map(_.split(","))
val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
val peopleDF = personRDD.toDF()
peopleDF.show()

Python

import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

from pyspark.sql.types import Row

textRDD = sc.textFile("/data/spark/people.txt")

arrayRDD = textRDD.map(lambda x: x.split(","))

rowRDD = arrayRDD.map(lambda arr: Row(name=arr[0], age=int(arr[1].strip())))

peopleDF = rowRDD.toDF()
peopleDF.show()

Let us a look at the first approach in converting an RDD into dataframe.

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table.

The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Seqs or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

Let's try to convert the text data into a dataframe. This text file people.txt is having values comma separated.

This code is available inside our github repository cloudxlab/bigdata.

Navigate to the folder spark/examples/dataframes and open the file rdd_to_df.scala as shown in the image.

Let me walk through the code.

Let us open spark shell with spark version 2 onwards. On the spark-shell, let's first import the spark implicits.

Let's create a case class Person. It has two variables name and age. We will basically create objects of this class for each line of text file. Later, The variable names will become the names of columns of the Dataframe.

Now, let's create textRDD using Text File function on spark content. This would create an RDD with name textRDD having each line from the file as a record. To understand what is in the rdd, you can use take function on textRDD.

Now, lets convert each line of the text into an array strings by splitting on the basis of comma. This can be easily accomplished using the map transformation with split function. The resulting RDD would have array having two string in each record.

Next, we convert each record having an array of two strings into an object of person case class. Again, we are using map transformation to transform each record into an instance of Person class.

Finally, this RDD of having objects of person class as records can easily be converted into a dataframe using a function called toDF() on the RDD. In the code, peopleDF is the dataframe which we can operate on.

We can look into this dataframe using show() function. You can see the spark has inferred the column names automatically from the objects. This is done using reflections ability of java. Using reflections, a java class can discover the attributes and functions of another object.

Once we have created the DF from our unstructured data using the reflection technique over RDDs, we can run the Dataframe operations as usual. We can register it as a temporary view.

And on this temporary view, we run sql query, as usual, using SQL function of spark. Here the SQL query is resulting in teenagersDF which is a dataframe having people who age is between 13 and 19.

Also, note that we can run the usual map transformation on the dataframe. These map transformations result in another dataframe. The function that you provide to the map transformation would get a Row object from org.apache.spark.sql package

Earlier we referring a column of the row by the index 0, we can also refer it by the name as shown in the code. Here we are passing the name of the column as an argument and the datatype of the returned value in square brackets.

If we want to retrieve multiple columns of a Row at once into a Map - having a key as the name of column and values as the value. You can use the getValuesMap function. It would basically convert a Row object into the Map. We need to define a MapEncoder first.

This mapEncoder which we are defining is implicit and is going to be used implicitly while transformating each row of a data frame into the values map.

You can see that it has printed the data correctly from teenagerDF. Since there is only one Row in the data frame the resulting Array has just one element.

Spark - Dataframes & Spark SQL (Part1)


No hints are availble for this assesment

Answer is not availble for this assesment

Please login to comment

23 Comments

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

Kindly explain the above code once again.

  Upvote    Share

Hi Mayank,

> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

It registers the mapEncoder into scala. Whenever we would try to a map having string as key, this encoder will be using implicitly. The implicit keyword does that.

> teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

This is internally using the mapEncoder defined above.

 

  Upvote    Share

hi

 i am getting the below error while running the python code

  Upvote    Share

Hi,

Your code snippet is coded in scala. Please feel free to copy-paste the above given scala code to be able to execute it properly.

Thanks.

  Upvote    Share

Please help with this use case : I have a List[Strings] each string being a valid json string ( not  in a text file) which is being returned by a function. I need to convert this List[Strings] into a DataFrame but WITH A given SCHEMA. I have the schema in a variable and can directly use it. But not understanding how to use it with a List[Strings] and get a Dataframe. 

 

  Upvote    Share

Please note that Each string in the array is a complex nested json string with Arrays and Structures, 

  Upvote    Share

Can anyone throw some pointers on this?

  Upvote    Share

Hi Abhas,

Could you please share your JSON structure so that we can assist you better.

  Upvote    Share

The except "This mapEncoder which we are defining is implicit and is going to be used implicitly while transformating each row of a data frame into the values map."

1. implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

mapEncoder = class[value[0]: binary]

2. // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

is mapEncoder declared in 1st staement being used in 2nd one? If yes how, if  no where it can be used?

 1  Upvote    Share

> is mapEncoder declared in 1st staement being used in 2nd one?

Yes.

> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]

the keyword implicit make mapEncoder hook intro serialization.

  Upvote    Share

Hi, can u please explain this code?

val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt))

what is attributes,  => sign, .trim function. Why .toInt is used. 
I understood that the code converts the records or arrayRDD into instance of Person class which we defined earlier. But I coould not understand the code very clearly, how exactly did it happen. Could u explain this in a stepwise manner?

  Upvote    Share

HI Pranav,

=> This is lambda syntax, commonly used in many programming languages, it is simple inline function. Here 'attribiutes' are input to the function and the part on right of => is the output i.e you will be getting a object of Person class in return. You may use any variable name in place of 'attribiutes' ans it  will work fine.

Trim is again a common function in any framework used to remove leading or trailing whitespaces from any string, and toInt is used because the age is defined as long in Person class and you are getting a string from text file.

 2  Upvote    Share

Thanks a ton!

 1  Upvote    Share

This comment has been removed.

Hi,

I am getting below error while running the pyspark code.

  Upvote    Share

Can you please let us know the error so that we can assist you better.

  Upvote    Share

which video can u share me the link for the same

  Upvote    Share

How to get access to the github?

 1  Upvote    Share

I did not get you. You can signup to Github and clone to repository as mentioned in the videos.

  Upvote    Share

In video the speaker says sql function can be run on spark context but in the description it is fired on spark variable which is for the spark session. As far as i remember spark context is represented using sc. please clarify

  Upvote    Share

Hi,

Could you please let me know the timeframe in the window so that I can assist you better.

Thanks,

Abhinav

  Upvote    Share

In the video it is said like 'This RDD can be implicitly converted to a DataFrame and then be registered as a View, Views can be used in subsequent SQL statements.' But in the description written as the table. Please clarify. Thanks

  Upvote    Share

yes, it is actually a view.
rdd.toDF().createTempView()

  Upvote    Share