While using the previous reflections based method of converting an RDD into Dataframe, we created a case class with various fields.
But if we don't know a number of fields along with their datatype during the time of coding or we want to make a generic program that can convert various kinds of RDDs into a dataframe, we would need to programmatically specify the schema.
In other words, when case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users)
In such cases, we can programmatically create a DataFrame with three steps.
Create an RDD of Rows from the original RDD;
Then Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
Let us take a look at programmatically specifying the schema.
Say, we have a file people.txt having data comma separated. And we are provided with schema details dynamically during runtime in a string schemaString. The schemaString would be having the name of columns space separated. Right now, it is having two columns name and age but in future, it could have more or less number of columns.
Let get started to create dataframes by programmatically specifying schema. The first step is to import all the classes from SQL and types packages. We would get Row object from SQL package and StructType from sql.types package.
Also, let's define schemaString as name space age inside double quotes. In real scenarios, we would be reading it from somewhere at runtime. Also, the schemaString might also contain the datatype of the field. Right now, we are just going to assume that both the column are of String datatype.
Let's first split the schemaString by space such that the resulting array has the column names as each element. Now, let's create the StructField for each element of the array i.e. the column. Here we are using "map" method provided by the scala not spark on iterable collection.
So we have created a variable with the name fields is an array of StructField objects. This is the schema.
Next step is to create the RDD as usual. Here we are creating the RDD from people.txt located in the /data/spark folder in HDFS. Here peopleRDD would make up of records where each record is a line.
Now, let's break each line of text by a comma and then create Row objects. Row objects are like array objects. Row objects have no information about the column names.
Using the fields schema and the RDD having rows, we can easily create the datafrom using createDataFrame method. This method takes the rowRDD as first argument and schema as the second argument.
The data frame created here has peopleDF name and is ready for the various operations. Let's take a look at the dataframe using show method. You can see that the dataframe has been successfully created programmatically. You can register it a view and run SQL queries on it.
Taking you to the next exercise in seconds...
Want to create exercises like this yourself? Click here.
No hints are availble for this assesment
Answer is not availble for this assesment
Please login to comment
24 Comments
I remember in Databricks spark we have an option of Inferschema = true. Is there anything like that in Apache spark where instead of we giving the names of the columns it automatically reads from the file?
Upvote ShareHi,
I think you would find this useful:
https://stackoverflow.com/questions/56927329/spark-option-inferschema-vs-header-true/56933052
Let me know if this resolves your query.
Thanks.
Upvote ShareCodes are provided both for Pyhton and Scala. Which is prefered/recommended? Purpose?
Upvote ShareHi,
Good question!
You can use both Python and Scala, however, your usage of either of these programming languages depends on the client requirement. Scala was originally used, however, Python is also getting traction. So it's best to learn both, if possible.
Thanks.
Upvote Sharecan schemastring be explained here before this video it will give more clarity.
i don't know the col names, and those col will be processed at run time, then how can i create the schema, here you have encoded the two col as string. can you please let me know.
how do we get this "schemaString". I understand the schemaString might be in an encoded format. But Which components provide us this, when and how ? Do you have code snippet from a real world project to support this ?(not code from Apache documentation please)
Upvote ShareHI,
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users.
You can create a schema of the dataframe dynamically so that different user sees different columns.
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
Kindly refer to the doc for reference :-https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#programmatically-specifying-the-schema
All the best!
Upvote Shareval fields = fieldsArray.map(
name => StructField(name, StringType, nullable = true)
)
why are we only considering name here ; how the age is defined in the whole process.
Upvote Share// The schema is encoded in a string
Upvote Shareval schemaString = "name age"
Hi @sandeepgiri:disqus , @Satyajit, @Abhinav,
Suppose i don't know the col names, and those col will be processed at run time, then how can i create the schema, here you have encoded the two col as string. can you please let me know.
Thanks,
Upvote ShareKunal
How are we getting the schemaString variable of column names with spaces dynamically at run time ? Who is providing this ?
Upvote ShareCan you elaborate on that ?
Hi, Siddharth.
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users. You can create a schema of the dataframe dynamically so that different user sees different columns.
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
Kindly refer to the doc for reference :- https://spark.apache.org/do...
All the best!
-- Satyajit Das
Upvote ShareHi Satyajit,
You are not elaborating on the query.
This is the information already given in the video. This actually doesn't answer my query still.
I know when classes cannot be defined ahead of time, that's when we structure the schema programmatically.
I also know If I have a schemaString how I am creating schema programmatically . That does not need clarification, neither the code needs a clarification.
My question is in real scenarios or projects how do we get this "schemaString". I understand the schemaString might be in an encoded format. But Which components provide us this, when and how ? Do you have code snippet from a real world project to support this ?(not code from Apache documentation please)
Upvote Sharehi
i tried to create a different schema string
import org.apache.spark.sql.types._
import org.apache.spark.sql._
val schemaString1 = "name age1 age2"
val schema1 = StructType(schemaString1.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)))
val peopleRDD = spark.sparkContext.textFile("/data/spark/people.txt")
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
val rowRDD1 = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim,attributes(2).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema1)
val peopleDF1 = spark.createDataFrame(rowRDD1, schema1)
peopleDF.show() -- error
peopleDF1.show(0 --error
it looks like schemastring should be known before hand. this eg is for - case classes that cannot be defined ahead of time. then schemastring can also not be known ahead of time.
Upvote ShareThen what is the benefit of schemastring over case classes?
Upvote ShareI have the same question as well.
Upvote Sharewhy we have not created struct field for age like we did for name,
Upvote ShareStructField(f, StringType, nullable = true)(here why do we have only stringtype)
Hi Harry,
This command will create separate StructField for both the name and age.
val fields = fieldsArray.map(
name => StructField(name. StringType, nullable = true)
)
The variable ‘name’ in the code fragment above is a totally separate thing from the field ‘name’ in the schema string.
Thanks
Hi , I have converted a csv file to parquet file . and i want to create Dataframe from that parquet file as it is far optimized than rdd in terms of performance . the first line of the csv file is the header . How to create DF from parquet file as the first line to be header .
Upvote ShareIn schema define , there is no mention of age field how it is passed to DataFrame.
Upvote ShareIt is there:
Upvote ShareschemaString = "name age"
in the program there is no defination for the age??
Upvote ShareAge is also inferred as string type .....everything to String !
Upvote ShareI request some guide or slides using python. I am not well verse in java.
Upvote ShareHi @@sunilsurya:disqus ,
You can check the Spark documentation for the same. Spark's Python APIs are similar to Java and Scala.
Hope this helps.
Thanks
Upvote Share