Apache Spark

Spark Dataframe JOINS – Only post you need to read

JOIN is used to retrieve data from two tables or dataframes. You will need “n” Join functions to fetch data from “n+1” dataframes. In order to join 2 dataframe you have to use “JOIN” function which requires 3 inputs – dataframe to join with, columns on which you want to join and type of join to execute.

You can replicate almost all types of joins possible in any typical SQL environment using Spark Dataframes. We will discuss about following join types in this post:

  1. SPARK INNER JOIN
  2. SPARK LEFT JOIN
  3. SPARK RIGHT JOIN
  4. SPARK FULL JOIN
  5. SPARK LEFT SEMI JOIN
  6. SPARK ANTI LEFT JOIN
  7. SPARK CROSS JOIN

Spark INNER JOIN

INNER JOINs are used to fetch only the common data between 2 tables or in this case 2 dataframes. You can join 2 dataframes on the basis of some key column/s and get the required data into another output dataframe.

Below is the example for INNER JOIN using spark dataframes:

val df_pres_states_inner = df_states
  .as("tb1")
  .join(df_pres.as("tb2"), $"tb2.pres_bs" === $"tb1.state_name", "inner")

df_pres_states_inner: org.apache.spark.sql.DataFrame = [pres_id: tinyint, pres_name: string ... 11 more fields]

Few points to observe from above statement:

  1. We have used “join” operator which takes 3 arguments. First one is another dataframe with which you want join. Second one is joining columns. Third one is join type which in this case is “INNER” join.
  2. If you will not mention any specific select at the end all the columns from dataframe 1 & dataframe 2 will come in the output.
  3. Since this is inner join , only the matching records will come in the output.
  4. We have used alias name for dataframes in the query and will recommend it as it helps in reducing complexity of the query. Also it avoids confusion if same column name exists in both the dataframes.

SPARK LEFT JOIN

Left outer join returns all the rows from table/dataframe on the left side and matching records from the right side dataframe. If the record does not exists on right side dataframe then in output you will see NULL as the values for non matching records.

val df_pres_states_leftouter = df_states
  .as("tb1")
  .join(df_pres.as("tb2"), $"tb2.pres_bs" === $"tb1.state_name", "leftouter")

df_pres_states_leftouter: org.apache.spark.sql.DataFrame = [state_id: tinyint, state_name: string ... 11 more fields]

You could have used “leftouter” or just “left” in the join type in above query. Both means the same join type. All the columns from both the dataframes will come in the output. If you need only few columns then use SELECT to mention the required columns.

SPARK RIGHT JOIN

Right outer join is similar to LEFT outer join with only difference is that it brings all the records from dataframe on the right side and only matching records from dataframe on the left side. If the value is not matching then it will return NULL for left table column. You can use “rightouter” or just “right” in the jointype.

val df_pres_states_rightouter = df_states
  .as("tb1")
  .join(df_pres.as("tb2"), $"tb2.pres_bs" === $"tb1.state_name", "rightouter")

df_pres_states_rightouter: org.apache.spark.sql.DataFrame = [state_id: tinyint, state_name: string ... 11 more fields]

SPARK FULL JOIN

Full Outer Join returns matching records from both the dataframes as well as non-matching records. Column values are set as NULL for non matching records in respective rows. You can use “outer”, “full” or “fullouter” as join type in the below query. All three means the same and will give same result.

val df_pres_states_fullouter = df_states
  .as("tb1")
  .join(df_pres.as("tb2"), $"tb2.pres_bs" === $"tb1.state_name", "fullouter")

df_pres_states_fullouter: org.apache.spark.sql.DataFrame = [state_id: tinyint, state_name: string ... 11 more fields]

SPARK LEFT SEMI JOIN

Left Semi Join returns all the records from left dataframe if the matching value exists in the right side dataframe. The difference between LEFT OUTER JOIN and LEFT SEMI JOIN is in the output returned. In Left Outer, all the records from LEFT table will come however in LEFT SEMI join only the matching records from LEFT dataframe will come. In LEFT OUTER join we may see one to many mapping hence increase in the number of expected output rows is possible. In LEFT SEMI join, we will not see more rows than expected even when one to many mapping exists. It is similar to “IN” condition.

The output dataframe will only have columns from LEFT DATAFRAME only.

val df_pres_states_leftsemi = df_states
  .as("tb1")
  .join(df_pres.as("tb2"), $"tb2.pres_bs" === $"tb1.state_name", "leftsemi")

df_pres_states_leftsemi: org.apache.spark.sql.DataFrame = [state_id: tinyint, state_name: string ... 4 more fields]

SPARK LEFT ANTI JOIN

LEFT Anti Join return all the records from LEFT dataframe which does not exists on the right side dataframe. It is opposite of LEFT SEMI join which returns only the matching records. It is similar to “NOT IN” condition. The output will only have output from LEFT DATAFRAME only.

val df_pres_states_leftanti = df_states
  .as("tb1")
  .join(df_pres.as("tb2"), $"tb2.pres_bs" === $"tb1.state_name", "leftanti")

df_pres_states_leftanti: org.apache.spark.sql.DataFrame = [state_id: tinyint, state_name: string ... 4 more fields]

SPARK CROSS JOIN

Cross Joins returns all the possible combination of output rows by matching all the rows from LEFT dataframe with rows from RIGHT dataframe. It results in m*n output rows if m,n are rows from left & right dataframe respectively.

val df_pres_states_cross = df_states.as("tb1").crossJoin(df_pres.as("tb2"))

df_pres_states_cross: org.apache.spark.sql.DataFrame = [state_id: tinyint, state_name: string ... 11 more fields]

The join above will result in 2250 rows (50 states * 45 presidents = 2250 rows). It is very expensive operation and must be avoided at all cost.

Leave a Reply