DataFrames, Spark SQL, R

5 / 18

Spark SQL - Dataframe Operations

Once you have created the dataframe, you can operate on it. There are many operations available on a dataframe.

To see the schema of a dataframe we can call printSchema method and it would show you the details of each of the columns. Here, we can see that it has automatically figured out the data type of age column as long and name column as String.

If we need to just project a single column, we could use the select method with the name of the column as an argument and then call show method on it.

You can see that it has displayed the values of the first column. This is very much like dataframe operations of R programming.

If we want to do complex projections on data such as adding 1 to the age and displaying it, we can simply use $age + 1. Please note that the evaluation is lazy in Spark. So, you would have to use show() or other action in order to start the computation.

The select method basically generates another dataframe but it does not hold actual data else it could cause memory overflow. So, in order to avoid memory overflows and optimize the computing, spark uses the lazy evaluation model.

It does not do the computation unless we really ask for it. It just keeps on making notes.

Further, if we would like to filter our data based on various conditions, you can use a method filter on a dataframe. This method takes an expression and in this expression, you can refer the column value using the dollar sign.

Here you can see that Andy is the only one having age above 21. Please note that this filter is not the same method as it was in RDD. In RDD, filter method was accepting a method as an argument while here it is accepting an expression in the argument.

A data frame also provides group by operation. GroupBy basically returns grouped dataset on which we execute aggregates such as count. This basically computes the counts of people of each age.

This operation is essentially equivalent to SQL query: Select age, count(*) from df group by age

Spark - Dataframes & Spark SQL (Part1)


No hints are availble for this assesment

Answer is not availble for this assesment

Please login to comment

26 Comments

I tried this command in pyspark : df.select($"name",$"age"+2).show()

but is is giving me below error:

File "<stdin>", line 1 df.select($"name",$"age"+2).show() ^SyntaxError: invalid syntax

Could you please help?

 

  Upvote    Share

Why I am getting this error?

  Upvote    Share

Hi Mayank,

You have to run it using the "spark.sql()" function. For example:

spark.sql("select age, count(*) from df group by age");

 

  Upvote    Share

Why is it so we need not add $symbol in name for single column,Is $ symbol mandatory for selecting multiple columns ?

or

  Upvote    Share

Please respond.

  Upvote    Share

That is part of sytnax I believe. 

  Upvote    Share

There has to be something more than the syntax.

1. //Select columns
df.select("name","age").show()

Output:

+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+

2. //Increment the age by 1
df.select($"name",$"age"+1).show()

Output:

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

Remove $ from 2nd command "age"+!. It will start giving error.
Name: org.apache.spark.sql.AnalysisException
Message: cannot resolve '`age1`' given input columns: [age, name];;
'Project [name#1, 'age1]
+- Relation[age#0L,name#1] json
StackTrace: 'Project [name#1, 'age1]

It is mentioned "$"columnName"  // Scala short hand for a named column." in the below URL. Please explain if you get break through.

https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/Column.html

 

  Upvote    Share

Whenever we want to have computed value of a column then we use $"columnname".

  Upvote    Share

Thanks Sandeep.

In that case column "age" should only have prefix '$' since that is the computed column but column 'name' also need to be prefix with '$'. It gives error otherwise.

df.select("name",$"age"+1).show

Name: Unknown Error
Message: <console>:46: error: overloaded method value select with alternatives:
  [U1, U2](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1], c2: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U2])org.apache.spark.sql.Dataset[(U1, U2)] <and>
  (col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
  (cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
 cannot be applied to (String, org.apache.spark.sql.Column)
       df.select("name",$"age"+1).show
          ^

 

 

  Upvote    Share

DF's select method takes arguments of type either all Strings or all org.apache.spark.sql.Columns but doesn't take a mix of both.

In your select call, "name" is of string type and $"age"+1 is of column type. Hence we have to apply '$' to the name also.

  Upvote    Share

Whenever we want to have computed value of a column then we use $"columnname".

  Upvote    Share

Hello,

 I by mistake typed - 

df.printschema()

Then realized that S should be capital. Is there any setting that could support case sensitive or in sensitive ? 

Thanks in advance

BT

 1  Upvote    Share

This case sensitive, we cannot disable case sensitivity.

  Upvote    Share

The select method basically generates another dataframe but it does not hold actual data else it could cause memory overflow. So, in order to avoid memory overflows

 

please explain this theory

  Upvote    Share

Hi. 

collect() will make the oevrflow of memory select is used to slect only required columns.

Calling collect() on an RDD will return the entire dataset to the driver which can cause out of memory and we should avoid that. Collect is an action.

Select is a transformation, not an action, so it is lazily evaluated (won't actually do the calculations just map the operations).

So, we use select() to avoid memory overflows

All the best!

 

  Upvote    Share

In spark, everything is basically lazy. When we call a transformation such as a filter(), select() it basically makes notes in the form of a DAG (Directed Acyclic Graph) and when calling an action such as take(), show() or collect() it runs the entire graph of transformations and gives you the result. 

Spark is designed for big data (More than few GBs of data) computations and therefore it is better to send the logic of the entire computation in a single shot for executing rather than sending the data generated at every step back and forth.

  Upvote    Share

Anyone practicing in pyspark

This are the commands
df.select(df.name, df.age.astype('int') +1 ).show()
df.filter(df.age > 21).show()
df.groupby('age').count().show()

 4  Upvote    Share

HIVE connection issue.
I am trying to open the HIVE view in AMBARI (tried both HIVE and HIVE2) and getting connection error.

Can someone please look into it. I previously tried using HUE and there were many issues with HUE. And it is difficult to type in DDL queries from the console.

  Upvote    Share

Is there any alernate methods to calling the dataframe using dollar? Can we use the dataframe.<variable_name>?

  Upvote    Share

Yes you can simply use the name of the column. The dollar is used when you want a new computed column.

  Upvote    Share

how can we add and delete a record to Data-frame using Spark-SQL?

  Upvote    Share

We can't. we need to create a new dataframe.

  Upvote    Share

scala> import org.apache.spark.sql.SparkSession
<console>:25: error: object SparkSession is not a member of package org.apache.spark.sql
import org.apache.spark.sql.SparkSession
^

  Upvote    Share
Abhinav Singh

Hi @@devasekhar:disqus ,

Are you running this command in Spark 1 or Spark 2? Above command will only work in Spark 2.

Hope this helps.

Thanks

  Upvote    Share

Hi,

In spark-shell prompt getting below error. Can you take a look at it?

var df= spark.read.json("/data/spark/people.json")
<console>:25: error: not found: value spark
var df= spark.read.json("/data/spark/people.json")

  Upvote    Share
Abhinav Singh

Hi @devasekhar:disqus,

Are you running this command in Spark 1 or Spark 2? Above command will only work in Spark 2.

Hope this helps.

Thanks

  Upvote    Share