Artificial Intelligence (AI) & Machine Learning Course for Managers

 Enroll Now

DataFrames, Spark SQL, R

10 / 18

Spark SQL - Converting RDD to Dataframe Using Programmatic Schema

While using the previous reflections based method of converting an RDD into Dataframe, we created a case class with various fields.

But if we don't know a number of fields along with their datatype during the time of coding or we want to make a generic program that can convert various kinds of RDDs into a dataframe, we would need to programmatically specify the schema.

In other words, when case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users)

In such cases, we can programmatically create a DataFrame with three steps.

Create an RDD of Rows from the original RDD;

Then Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.

Let us take a look at programmatically specifying the schema.

Say, we have a file people.txt having data comma separated. And we are provided with schema details dynamically during runtime in a string schemaString. The schemaString would be having the name of columns space separated. Right now, it is having two columns name and age but in future, it could have more or less number of columns.

Let get started to create dataframes by programmatically specifying schema. The first step is to import all the classes from SQL and types packages. We would get Row object from SQL package and StructType from sql.types package.

Also, let's define schemaString as name space age inside double quotes. In real scenarios, we would be reading it from somewhere at runtime. Also, the schemaString might also contain the datatype of the field. Right now, we are just going to assume that both the column are of String datatype.

Let's first split the schemaString by space such that the resulting array has the column names as each element. Now, let's create the StructField for each element of the array i.e. the column. Here we are using "map" method provided by the scala not spark on iterable collection.

So we have created a variable with the name fields is an array of StructField objects. This is the schema.

Next step is to create the RDD as usual. Here we are creating the RDD from people.txt located in the /data/spark folder in HDFS. Here peopleRDD would make up of records where each record is a line.

Now, let's break each line of text by a comma and then create Row objects. Row objects are like array objects. Row objects have no information about the column names.

Using the fields schema and the RDD having rows, we can easily create the datafrom using createDataFrame method. This method takes the rowRDD as first argument and schema as the second argument.

The data frame created here has peopleDF name and is ready for the various operations. Let's take a look at the dataframe using show method. You can see that the dataframe has been successfully created programmatically. You can register it a view and run SQL queries on it.