Spark DISTINCT or spark drop duplicates is used to remove duplicate rows in the Dataframe. Row consists of columns, if you are selecting only one column then output will be unique values for that specific column. DISTINCT is very commonly used to identify possible values which exists in the dataframe for any given column.
Spark read csv and create dataframe
val df_pres = spark.read.option("header",true).csv("hdfs:///var/data/pres_data.csv") df_pres.printSchema() root |-- pres_id: string (nullable = true) |-- pres_name: string (nullable = true) |-- pres_dob: string (nullable = true) |-- pres_bp: string (nullable = true) |-- pres_bs: string (nullable = true) |-- pres_in: string (nullable = true) |-- pres_out: string (nullable = true)
We will use above created dataframe in the below examples. Let’s check all the states value in the dataframe.
scala> df_pres.select($"pres_bs").show(50) +--------------------+ | pres_bs| +--------------------+ | Virginia| | Massachusetts| | Virginia| | Virginia| | Virginia| | Massachusetts| |South/North Carolina| | New York| | Virginia| | Virginia| | North Carolina| | Virginia| | New York| | New Hampshire| | Pennsylvania| | Kentucky| | North Carolina| | Ohio| | Ohio| | Ohio| | Vermont| | New Jersey| | Ohio| | New Jersey| | Ohio| | New York| | Ohio| | Virginia| | Ohio| | Vermont| | Iowa| | New York| | Missouri| | Texas| | Massachusetts| | Texas| | California| | Nebraska| | Georgia| | Illinois| | Massachusetts| | Arkansas| | Connecticut| | Hawaii| | New York| | Pennsylvania| +--------------------+ scala> df_pres.select($"pres_bs").count() res4: Long = 46
In the above example we can see total there are 46 rows in the output.
You can see it has many duplicate values. If you just want unique values then use distinct function.
Spark unique values in column
scala> df_pres.select($"pres_bs").distinct().show(50) +--------------------+ | pres_bs| +--------------------+ | Nebraska| | Missouri| | Massachusetts| | Kentucky| | New York| | Connecticut| |South/North Carolina| | Iowa| | Hawaii| | Arkansas| | Pennsylvania| | Illinois| | North Carolina| | Ohio| | Texas| | Vermont| | Virginia| | Georgia| | New Jersey| | New Hampshire| | California| +--------------------+ scala> df_pres.select($"pres_bs").distinct().count() res7: Long = 21
In the above example you can see there are 21 unique value in the column.
spark dataframe drop duplicates
scala> df_pres.select($"pres_bs").dropDuplicates().show(50) +--------------------+ | pres_bs| +--------------------+ | Nebraska| | Missouri| | Massachusetts| | Kentucky| | New York| | Connecticut| |South/North Carolina| | Iowa| | Hawaii| | Arkansas| | Pennsylvania| | Illinois| | North Carolina| | Ohio| | Texas| | Vermont| | Virginia| | Georgia| | New Jersey| | New Hampshire| | California| +--------------------+ scala> df_pres.select($"pres_bs").dropDuplicates().count() res9: Long = 21
In the above example , we saw 21 unique rows in the output. We fetch one column only and applied drop duplicates to it get unique values in the column.
Spark distinct vs dropduplicates
Both distinct & drop duplicates can be used to remove duplicate rows and get only unique values in the output. However there is added functionality in drop duplicates which allows it to accept column list for which you want unique value. This column list can be subset of actual select list. You can select 10 columns and do unique check on 5 columns only using drop duplicates. However that is not possible with DISTINCT. The SELECT list and DISTINCT column list is same. But SELECT list and DROP DUPLICATE column list can be different.
Let us see this with an example.
Spark DISTINCT
scala> df_pres.select($"pres_id",$"pres_bs").distinct().sort($"pres_id".cast("integer")).show(50) +-------+--------------------+ |pres_id| pres_bs| +-------+--------------------+ | 1| Virginia| | 2| Massachusetts| | 3| Virginia| | 4| Virginia| | 5| Virginia| | 6| Massachusetts| | 7|South/North Carolina| | 8| New York| | 9| Virginia| | 10| Virginia| | 11| North Carolina| | 12| Virginia| | 13| New York| | 14| New Hampshire| | 15| Pennsylvania| | 16| Kentucky| | 17| North Carolina| | 18| Ohio| | 19| Ohio| | 20| Ohio| | 21| Vermont| | 22| New Jersey| | 23| Ohio| | 24| New Jersey| | 25| Ohio| | 26| New York| | 27| Ohio| | 28| Virginia| | 29| Ohio| | 30| Vermont| | 31| Iowa| | 32| New York| | 33| Missouri| | 34| Texas| | 35| Massachusetts| | 36| Texas| | 37| California| | 38| Nebraska| | 39| Georgia| | 40| Illinois| | 41| Massachusetts| | 42| Arkansas| | 43| Connecticut| | 44| Hawaii| | 45| New York| | 46| Pennsylvania| +-------+--------------------+ scala> df_pres.select($"pres_id",$"pres_bs").distinct().sort($"pres_id".cast("integer")).count() res16: Long = 46
In the above example, we can see there are 46 distinct rows if we select 2 columns. Since pres_id is sequence and is unique we are getting all records in the output.
Spark drop duplicates
scala> df_pres.select($"pres_id",$"pres_bs").dropDuplicates("pres_bs").sort($"pres_id".cast("integer")).show(50) +-------+--------------------+ |pres_id| pres_bs| +-------+--------------------+ | 1| Virginia| | 2| Massachusetts| | 7|South/North Carolina| | 8| New York| | 11| North Carolina| | 14| New Hampshire| | 15| Pennsylvania| | 16| Kentucky| | 18| Ohio| | 21| Vermont| | 22| New Jersey| | 31| Iowa| | 33| Missouri| | 34| Texas| | 37| California| | 38| Nebraska| | 39| Georgia| | 40| Illinois| | 42| Arkansas| | 43| Connecticut| | 44| Hawaii| +-------+--------------------+ scala> df_pres.select($"pres_id",$"pres_bs").dropDuplicates("pres_bs").sort($"pres_id".cast("integer")).count() res20: Long = 21
You can see in the above example, we can pass column list into the dropDuplicates function in spark to remove records on the basis of subset of columns only and not all the columns mentioned in the select list.