Enrollments closing soon for Post Graduate Certificate Program in Applied Data Science & AI By IIT Roorkee | 3 Seats Left

  Apply Now

DataFrames, Spark SQL, R

9 / 18

Spark SQL - Infer Schema Using Reflection

Code

Scala

val spark2 = spark
import spark2.implicits._

case class Person(name: String, age: Long)

val txtRDD = sc.textFile("/data/spark/people.txt")
val arrayRDD = txtRDD.map(_.split(","))
val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
val peopleDF = personRDD.toDF()
peopleDF.show()

Python

import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

from pyspark.sql.types import Row

textRDD = sc.textFile("/data/spark/people.txt")

arrayRDD = textRDD.map(lambda x: x.split(","))

rowRDD = arrayRDD.map(lambda arr: Row(name=arr[0], age=int(arr[1].strip())))

peopleDF = rowRDD.toDF()
peopleDF.show()

Let us a look at the first approach in converting an RDD into dataframe.

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table.

The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Seqs or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

Let's try to convert the text data into a dataframe. This text file people.txt is having values comma separated.

This code is available inside our github repository cloudxlab/bigdata.

Navigate to the folder spark/examples/dataframes and open the file rdd_to_df.scala as shown in the image.

Let me walk through the code.

Let us open spark shell with spark version 2 onwards. On the spark-shell, let's first import the spark implicits.

Let's create a case class Person. It has two variables name and age. We will basically create objects of this class for each line of text file. Later, The variable names will become the names of columns of the Dataframe.

Now, let's create textRDD using Text File function on spark content. This would create an RDD with name textRDD having each line from the file as a record. To understand what is in the rdd, you can use take function on textRDD.

Now, lets convert each line of the text into an array strings by splitting on the basis of comma. This can be easily accomplished using the map transformation with split function. The resulting RDD would have array having two string in each record.

Next, we convert each record having an array of two strings into an object of person case class. Again, we are using map transformation to transform each record into an instance of Person class.

Finally, this RDD of having objects of person class as records can easily be converted into a dataframe using a function called toDF() on the RDD. In the code, peopleDF is the dataframe which we can operate on.

We can look into this dataframe using show() function. You can see the spark has inferred the column names automatically from the objects. This is done using reflections ability of java. Using reflections, a java class can discover the attributes and functions of another object.

Once we have created the DF from our unstructured data using the reflection technique over RDDs, we can run the Dataframe operations as usual. We can register it as a temporary view.

And on this temporary view, we run sql query, as usual, using SQL function of spark. Here the SQL query is resulting in teenagersDF which is a dataframe having people who age is between 13 and 19.

Also, note that we can run the usual map transformation on the dataframe. These map transformations result in another dataframe. The function that you provide to the map transformation would get a Row object from org.apache.spark.sql package

Earlier we referring a column of the row by the index 0, we can also refer it by the name as shown in the code. Here we are passing the name of the column as an argument and the datatype of the returned value in square brackets.

If we want to retrieve multiple columns of a Row at once into a Map - having a key as the name of column and values as the value. You can use the getValuesMap function. It would basically convert a Row object into the Map. We need to define a MapEncoder first.

This mapEncoder which we are defining is implicit and is going to be used implicitly while transformating each row of a data frame into the values map.

You can see that it has printed the data correctly from teenagerDF. Since there is only one Row in the data frame the resulting Array has just one element.

Spark - Dataframes & Spark SQL (Part1)


No hints are availble for this assesment

Answer is not availble for this assesment

Loading comments...