In Spark, groupBy aggregate functions are used to group multiple rows into one and calculate measures by applying functions like MAX,SUM,COUNT etc. In Spark , you can perform aggregate operations on dataframe. This is similar to what we have in SQL like MAX, MIN, SUM etc. We can also perform aggregation on some specific columns which is equivalent to GROUP BY clause we have in typical SQL. Let’s see it with some examples.
Spark AGG function
First method we can use is “agg”. To calculate count, max, min, sum we can use below syntax:
scala> df_pres.agg(count($"pres_id"),min($"pres_id"),max($"pres_id"),sum("pres_id")).show() +--------------+------------+------------+------------+ |count(pres_id)|min(pres_id)|max(pres_id)|sum(pres_id)| +--------------+------------+------------+------------+ | 45| 1| 45| 1035| +--------------+------------+------------+------------+
Let’s add alias name to columns.
scala> df_pres.agg(count($"pres_id").as("count"),min($"pres_id").as("min"),max($"pres_id").as("max"),sum("pres_id").as("sum")).show() +-----+---+---+----+ |count|min|max| sum| +-----+---+---+----+ | 45| 1| 45|1035| +-----+---+---+----+
Spark GroupBy function
Second method is to use “groupBy” method to do aggregation on specific column/s. Let’s see this with examples:
scala> df_pres.groupBy($"pres_bs").count().show() +--------------+-----+ | pres_bs|count| +--------------+-----+ | Virginia| 8| |North Carolina| 3| | New Hampshire| 1| | Kentucky| 1| | Ohio| 7| | New Jersey| 2| | Missouri| 1| | Texas| 2| | Massachusetts| 4| | New York| 5| | Pennsylvania| 1| | Vermont| 2| | Iowa| 1| | California| 1| | Nebraska| 1| | Georgia| 1| | Illinois| 1| | Arkansas| 1| | Connecticut| 1| | Hawaii| 1| +--------------+-----+
Let’s sort the output on the basis of count with highest count on top.
scala> df_pres.groupBy($"pres_bs").count().orderBy($"count".desc).show() +--------------+-----+ | pres_bs|count| +--------------+-----+ | Virginia| 8| | Ohio| 7| | New York| 5| | Massachusetts| 4| |North Carolina| 3| | New Jersey| 2| | Vermont| 2| | Texas| 2| | New Hampshire| 1| | Missouri| 1| | Kentucky| 1| | Pennsylvania| 1| | Iowa| 1| | Georgia| 1| | Hawaii| 1| | Illinois| 1| | Connecticut| 1| | California| 1| | Arkansas| 1| | Nebraska| 1| +--------------+-----+
Spark GroupBy with AGG function
We can use “agg” function here with “groupBy” method to get same result.
scala> df_pres.groupBy($"pres_bs").agg(count($"pres_id").alias("cnt")).orderBy($"cnt".desc).show() +--------------+---+ | pres_bs|cnt| +--------------+---+ | Virginia| 8| | Ohio| 7| | New York| 5| | Massachusetts| 4| |North Carolina| 3| | New Jersey| 2| | Vermont| 2| | Texas| 2| | New Hampshire| 1| | Missouri| 1| | Kentucky| 1| | Pennsylvania| 1| | Iowa| 1| | Georgia| 1| | Hawaii| 1| | Illinois| 1| | Connecticut| 1| | California| 1| | Arkansas| 1| | Nebraska| 1| +--------------+---+
You can calculate multiple aggregates in the same agg method as required.
scala> df_pres.groupBy($"pres_bs").agg(count($"pres_id").alias("cnt"),max($"pres_dob").as("dob")).orderBy($"cnt".desc).show() +--------------+---+----------+ | pres_bs|cnt| dob| +--------------+---+----------+ | Virginia| 8|1856-12-28| | Ohio| 7|1865-11-02| | New York| 5|1946-06-14| | Massachusetts| 4|1924-06-12| |North Carolina| 3|1808-12-29| | New Jersey| 2|1837-03-18| | Vermont| 2|1872-07-04| | Texas| 2|1908-08-27| | New Hampshire| 1|1804-11-23| | Missouri| 1|1884-05-08| | Kentucky| 1|1809-02-12| | Pennsylvania| 1|1791-04-23| | Iowa| 1|1874-08-10| | Georgia| 1|1924-10-01| | Hawaii| 1|1961-08-04| | Illinois| 1|1911-02-06| | Connecticut| 1|1946-07-06| | California| 1|1913-01-09| | Arkansas| 1|1946-08-19| | Nebraska| 1|1913-07-14| +--------------+---+----------+
Spark AGG with MAP function
In the above query , we have calculated COUNT on one column and calculated MAX on another column. You can also use “Map” method inside “agg” to get same result.
scala> df_pres.groupBy($"pres_bs").agg(Map("pres_id"->"count" , "pres_dob"->"max")).orderBy($"count(pres_id)".desc).show() +--------------+--------------+-------------+ | pres_bs|count(pres_id)|max(pres_dob)| +--------------+--------------+-------------+ | Virginia| 8| 1856-12-28| | Ohio| 7| 1865-11-02| | New York| 5| 1946-06-14| | Massachusetts| 4| 1924-06-12| |North Carolina| 3| 1808-12-29| | New Jersey| 2| 1837-03-18| | Vermont| 2| 1872-07-04| | Texas| 2| 1908-08-27| | New Hampshire| 1| 1804-11-23| | Missouri| 1| 1884-05-08| | Kentucky| 1| 1809-02-12| | Pennsylvania| 1| 1791-04-23| | Iowa| 1| 1874-08-10| | Georgia| 1| 1924-10-01| | Hawaii| 1| 1961-08-04| | Illinois| 1| 1911-02-06| | Connecticut| 1| 1946-07-06| | California| 1| 1913-01-09| | Arkansas| 1| 1946-08-19| | Nebraska| 1| 1913-07-14| +--------------+--------------+-------------+
So we have seen following cases in this post:
1) You can directly use “agg” method on dataframe if no grouping is required.
2) You can use “groupBy” along with “agg” to calculate measures on the basis of some columns.
3) We saw multiple ways of writing same aggregate calculations.
Hope this helps.