Spark Dataframe Aggregate Functions

In Spark , you can perform aggregate operations on dataframe. This is similar to what we have in SQL like MAX, MIN, SUM etc. We can also perform aggregation on some specific columns which is equivalent to GROUP BY clause we have in typical SQL. Let’s see it with some examples.

First method we can use is “agg”. To calculate count, max, min, sum we can use below syntax:

scala> df_pres.agg(count($"pres_id"),min($"pres_id"),max($"pres_id"),sum("pres_id")).show()
+--------------+------------+------------+------------+
|count(pres_id)|min(pres_id)|max(pres_id)|sum(pres_id)|
+--------------+------------+------------+------------+
| 45| 1| 45| 1035|
+--------------+------------+------------+------------+

Let’s add alias name to columns.

 scala> df_pres.agg(count($"pres_id").as("count"),min($"pres_id").as("min"),max($"pres_id").as("max"),sum("pres_id").as("sum")).show() 
 +-----+---+---+----+
 |count|min|max| sum|
 +-----+---+---+----+
 |   45|  1| 45|1035|
 +-----+---+---+----+

Second method is to use “agg” with “groupBy” method to do aggregation on specific column/s. Let’s see this with examples:

 scala> df_pres.groupBy($"pres_bs").count().show()
 +--------------+-----+
 |       pres_bs|count|
 +--------------+-----+
 |      Virginia|    8|
 |North Carolina|    3|
 | New Hampshire|    1|
 |      Kentucky|    1|
 |          Ohio|    7|
 |    New Jersey|    2|
 |      Missouri|    1|
 |         Texas|    2|
 | Massachusetts|    4|
 |      New York|    5|
 |  Pennsylvania|    1|
 |       Vermont|    2|
 |          Iowa|    1|
 |    California|    1|
 |      Nebraska|    1|
 |       Georgia|    1|
 |      Illinois|    1|
 |      Arkansas|    1|
 |   Connecticut|    1|
 |        Hawaii|    1|
 +--------------+-----+

Let’s sort the output on the basis of count with highest count on top.

scala> df_pres.groupBy($"pres_bs").count().orderBy($"count".desc).show()

+--------------+-----+

|       pres_bs|count|

+--------------+-----+

|      Virginia|    8|

|          Ohio|    7|

|      New York|    5|

| Massachusetts|    4|

|North Carolina|    3|

|    New Jersey|    2|

|       Vermont|    2|

|         Texas|    2|

| New Hampshire|    1|

|      Missouri|    1|

|      Kentucky|    1|

|  Pennsylvania|    1|

|          Iowa|    1|

|       Georgia|    1|

|        Hawaii|    1|

|      Illinois|    1|

|   Connecticut|    1|

|    California|    1|

|      Arkansas|    1|

|      Nebraska|    1|

+--------------+-----+

We could have used “agg” function here with “groupBy” method to get same result.

scala> df_pres.groupBy($"pres_bs").agg(count($"pres_id").alias("cnt")).orderBy($"cnt".desc).show()
+--------------+---+
| pres_bs|cnt|
+--------------+---+
| Virginia| 8|
| Ohio| 7|
| New York| 5|
| Massachusetts| 4|
|North Carolina| 3|
| New Jersey| 2|
| Vermont| 2|
| Texas| 2|
| New Hampshire| 1|
| Missouri| 1|
| Kentucky| 1|
| Pennsylvania| 1|
| Iowa| 1|
| Georgia| 1|
| Hawaii| 1|
| Illinois| 1|
| Connecticut| 1|
| California| 1|
| Arkansas| 1|
| Nebraska| 1|
+--------------+---+

You can calculate multiple aggregates in the same agg method as required.

scala> df_pres.groupBy($"pres_bs").agg(count($"pres_id").alias("cnt"),max($"pres_dob").as("dob")).orderBy($"cnt".desc).show()
+--------------+---+----------+
| pres_bs|cnt| dob|
+--------------+---+----------+
| Virginia| 8|1856-12-28|
| Ohio| 7|1865-11-02|
| New York| 5|1946-06-14|
| Massachusetts| 4|1924-06-12|
|North Carolina| 3|1808-12-29|
| New Jersey| 2|1837-03-18|
| Vermont| 2|1872-07-04|
| Texas| 2|1908-08-27|
| New Hampshire| 1|1804-11-23|
| Missouri| 1|1884-05-08|
| Kentucky| 1|1809-02-12|
| Pennsylvania| 1|1791-04-23|
| Iowa| 1|1874-08-10|
| Georgia| 1|1924-10-01|
| Hawaii| 1|1961-08-04|
| Illinois| 1|1911-02-06|
| Connecticut| 1|1946-07-06|
| California| 1|1913-01-09|
| Arkansas| 1|1946-08-19|
| Nebraska| 1|1913-07-14|
+--------------+---+----------+

In the above query , we have calculated COUNT on one column and calculated MAX on another column. You can also use “Map” method inside “agg” to get same result.

scala> df_pres.groupBy($"pres_bs").agg(Map("pres_id"->"count" , "pres_dob"->"max")).orderBy($"count(pres_id)".desc).show()
+--------------+--------------+-------------+
| pres_bs|count(pres_id)|max(pres_dob)|
+--------------+--------------+-------------+
| Virginia| 8| 1856-12-28|
| Ohio| 7| 1865-11-02|
| New York| 5| 1946-06-14|
| Massachusetts| 4| 1924-06-12|
|North Carolina| 3| 1808-12-29|
| New Jersey| 2| 1837-03-18|
| Vermont| 2| 1872-07-04|
| Texas| 2| 1908-08-27|
| New Hampshire| 1| 1804-11-23|
| Missouri| 1| 1884-05-08|
| Kentucky| 1| 1809-02-12|
| Pennsylvania| 1| 1791-04-23|
| Iowa| 1| 1874-08-10|
| Georgia| 1| 1924-10-01|
| Hawaii| 1| 1961-08-04|
| Illinois| 1| 1911-02-06|
| Connecticut| 1| 1946-07-06|
| California| 1| 1913-01-09|
| Arkansas| 1| 1946-08-19|
| Nebraska| 1| 1913-07-14|
+--------------+--------------+-------------+

So we have seen following cases in this post:
1) You can directly use “agg” method on dataframe if no grouping is required.
2) You can use “groupBy” along with “agg” to calculate measures on the basis of some columns.
3) We saw multiple ways of writing same aggregate calculations.

Hope this helps.

Leave a Reply

Your email address will not be published. Required fields are marked *