val spark2 = spark
import spark2.implicits._
case class Person(name: String, age: Long)
val txtRDD = sc.textFile("/data/spark/people.txt")
val arrayRDD = txtRDD.map(_.split(","))
val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
val peopleDF = personRDD.toDF()
peopleDF.show()
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
from pyspark.sql.types import Row
textRDD = sc.textFile("/data/spark/people.txt")
arrayRDD = textRDD.map(lambda x: x.split(","))
rowRDD = arrayRDD.map(lambda arr: Row(name=arr[0], age=int(arr[1].strip())))
peopleDF = rowRDD.toDF()
peopleDF.show()
Let us a look at the first approach in converting an RDD into dataframe.
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table.
The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Seqs or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.
Let's try to convert the text data into a dataframe. This text file people.txt is having values comma separated.
This code is available inside our github repository cloudxlab/bigdata.
Navigate to the folder spark/examples/dataframes and open the file rdd_to_df.scala as shown in the image.
Let me walk through the code.
Let us open spark shell with spark version 2 onwards. On the spark-shell, let's first import the spark implicits.
Let's create a case class Person. It has two variables name and age. We will basically create objects of this class for each line of text file. Later, The variable names will become the names of columns of the Dataframe.
Now, let's create textRDD using Text File function on spark content. This would create an RDD with name textRDD having each line from the file as a record. To understand what is in the rdd, you can use take function on textRDD.
Now, lets convert each line of the text into an array strings by splitting on the basis of comma. This can be easily accomplished using the map transformation with split function. The resulting RDD would have array having two string in each record.
Next, we convert each record having an array of two strings into an object of person case class. Again, we are using map transformation to transform each record into an instance of Person class.
Finally, this RDD of having objects of person class as records can easily be converted into a dataframe using a function called toDF() on the RDD. In the code, peopleDF is the dataframe which we can operate on.
We can look into this dataframe using show() function. You can see the spark has inferred the column names automatically from the objects. This is done using reflections ability of java. Using reflections, a java class can discover the attributes and functions of another object.
Once we have created the DF from our unstructured data using the reflection technique over RDDs, we can run the Dataframe operations as usual. We can register it as a temporary view.
And on this temporary view, we run sql query, as usual, using SQL function of spark. Here the SQL query is resulting in teenagersDF which is a dataframe having people who age is between 13 and 19.
Also, note that we can run the usual map transformation on the dataframe. These map transformations result in another dataframe. The function that you provide to the map transformation would get a Row object from org.apache.spark.sql package
Earlier we referring a column of the row by the index 0, we can also refer it by the name as shown in the code. Here we are passing the name of the column as an argument and the datatype of the returned value in square brackets.
If we want to retrieve multiple columns of a Row at once into a Map - having a key as the name of column and values as the value. You can use the getValuesMap function. It would basically convert a Row object into the Map. We need to define a MapEncoder first.
This mapEncoder which we are defining is implicit and is going to be used implicitly while transformating each row of a data frame into the values map.
You can see that it has printed the data correctly from teenagerDF. Since there is only one Row in the data frame the resulting Array has just one element.
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Please login to comment
23 Comments
Kindly explain the above code once again.
Upvote ShareHi Mayank,
> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
It registers the mapEncoder into scala. Whenever we would try to a map having string as key, this encoder will be using implicitly. The implicit keyword does that.
> teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
This is internally using the mapEncoder defined above.
hi
i am getting the below error while running the python code
Hi,
Your code snippet is coded in scala. Please feel free to copy-paste the above given scala code to be able to execute it properly.
Thanks.
Upvote SharePlease help with this use case : I have a List[Strings] each string being a valid json string ( not in a text file) which is being returned by a function. I need to convert this List[Strings] into a DataFrame but WITH A given SCHEMA. I have the schema in a variable and can directly use it. But not understanding how to use it with a List[Strings] and get a Dataframe.
Please note that Each string in the array is a complex nested json string with Arrays and Structures,
Upvote ShareCan anyone throw some pointers on this?
Upvote ShareHi Abhas,
Could you please share your JSON structure so that we can assist you better.
Upvote ShareThe except "This mapEncoder which we are defining is implicit and is going to be used implicitly while transformating each row of a data frame into the values map."
1. implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
2. // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
is mapEncoder declared in 1st staement being used in 2nd one? If yes how, if no where it can be used?
1 Upvote Share> is mapEncoder declared in 1st staement being used in 2nd one?
Yes.
> implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
the keyword implicit make mapEncoder hook intro serialization.
Upvote ShareHi, can u please explain this code?
val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
what is attributes, => sign, .trim function. Why .toInt is used.
Upvote ShareI understood that the code converts the records or arrayRDD into instance of Person class which we defined earlier. But I coould not understand the code very clearly, how exactly did it happen. Could u explain this in a stepwise manner?
HI Pranav,
=> This is lambda syntax, commonly used in many programming languages, it is simple inline function. Here 'attribiutes' are input to the function and the part on right of => is the output i.e you will be getting a object of Person class in return. You may use any variable name in place of 'attribiutes' ans it will work fine.
Trim is again a common function in any framework used to remove leading or trailing whitespaces from any string, and toInt is used because the age is defined as long in Person class and you are getting a string from text file.
2 Upvote ShareThanks a ton!
1 Upvote ShareThis comment has been removed.
Hi,
I am getting below error while running the pyspark code.
Upvote ShareCan you please let us know the error so that we can assist you better.
Upvote Sharewhich video can u share me the link for the same
Upvote ShareHow to get access to the github?
1 Upvote ShareI did not get you. You can signup to Github and clone to repository as mentioned in the videos.
Upvote ShareIn video the speaker says sql function can be run on spark context but in the description it is fired on spark variable which is for the spark session. As far as i remember spark context is represented using sc. please clarify
Upvote ShareHi,
Could you please let me know the timeframe in the window so that I can assist you better.
Thanks,
Abhinav
Upvote ShareIn the video it is said like 'This RDD can be implicitly converted to a DataFrame and then be registered as a View, Views can be used in subsequent SQL statements.' But in the description written as the table. Please clarify. Thanks
Upvote Shareyes, it is actually a view.
Upvote Sharerdd.toDF().createTempView()