Apache Spark

Spark Dataframe – monotonically_increasing_id

Spark dataframe add row number is very common requirement especially if you are working on ELT in Spark. You can use monotonically_increasing_id method to generate incremental numbers. However the numbers won’t be consecutive if the dataframe has more than 1 partition. Let’s see a simple example to understand it :

scala> df_usa.show(false)
+-----------+----------+-------------+
|state_name |state_abbr|state_capital|
+-----------+----------+-------------+
|Alabama    |AL        |Montgomery   |
|Alaska     |AK        |Juneau       |
|Arizona    |AZ        |Phoenix      |
|Arkansas   |AR        |Little Rock  |
|California |CA        |Sacramento   |
|Colorado   |CO        |Denver       |
|Connecticut|CT        |Hartford     |
|Delaware   |DE        |Dover        |
|Florida    |FL        |Tallahassee  |
|Georgia    |GA        |Atlanta      |
|Hawaii     |HI        |Honolulu     |
|Idaho      |ID        |Boise        |
|Illinois   |IL        |Springfield  |
|Indiana    |IN        |Indianapolis |
|Iowa       |IA        |Des Moines   |
|Kansas     |KS        |Topeka       |
|Kentucky   |KY        |Frankfort    |
|Louisiana  |LA        |Baton Rouge  |
|Maine      |ME        |Augusta      |
|Maryland   |MD        |Annapolis    |
+-----------+----------+-------------+
only showing top 20 rows

So I have a dataframe which has information about all 50 states in USA. Now I want to add a new column “state_id” as sequence number. Before that I will create 3 version of this dataframe with 1,2 & 3 partitions respectively. This is required to understand behavior of monotonically_increasing_id clearly.

scala> val df_usa1 = df_usa.repartition(1)
df_usa1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [state_name: string, state_abbr: string ... 1 more field]

scala> val df_usa2 = df_usa.repartition(2)
df_usa2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [state_name: string, state_abbr: string ... 1 more field]

scala> val df_usa3 = df_usa.repartition(3)
df_usa3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [state_name: string, state_abbr: string ... 1 more field]

Now let’s generate the numbers and compare the output.

monotonically_increasing_id – single partition dataframe

scala> df_usa1.withColumn("state_id",monotonically_increasing_id).show(50)
+--------------+----------+--------------+--------+
|    state_name|state_abbr| state_capital|state_id|
+--------------+----------+--------------+--------+
|       Alabama|        AL|    Montgomery|       0|
|        Alaska|        AK|        Juneau|       1|
|       Arizona|        AZ|       Phoenix|       2|
|      Arkansas|        AR|   Little Rock|       3|
|    California|        CA|    Sacramento|       4|
|      Colorado|        CO|        Denver|       5|
|   Connecticut|        CT|      Hartford|       6|
|      Delaware|        DE|         Dover|       7|
|       Florida|        FL|   Tallahassee|       8|
|       Georgia|        GA|       Atlanta|       9|
|        Hawaii|        HI|      Honolulu|      10|
|         Idaho|        ID|         Boise|      11|
|      Illinois|        IL|   Springfield|      12|
|       Indiana|        IN|  Indianapolis|      13|
|          Iowa|        IA|    Des Moines|      14|
|        Kansas|        KS|        Topeka|      15|
|      Kentucky|        KY|     Frankfort|      16|
|     Louisiana|        LA|   Baton Rouge|      17|
|         Maine|        ME|       Augusta|      18|
|      Maryland|        MD|     Annapolis|      19|
| Massachusetts|        MA|        Boston|      20|
|      Michigan|        MI|       Lansing|      21|
|     Minnesota|        MN|      St. Paul|      22|
|   Mississippi|        MS|       Jackson|      23|
|      Missouri|        MO|Jefferson City|      24|
|       Montana|        MT|        Helena|      25|
|      Nebraska|        NE|       Lincoln|      26|
|        Nevada|        NV|   Carson City|      27|
| New Hampshire|        NH|       Concord|      28|
|    New Jersey|        NJ|       Trenton|      29|
|    New Mexico|        NM|      Santa Fe|      30|
|      New York|        NY|        Albany|      31|
|North Carolina|        NC|       Raleigh|      32|
|  North Dakota|        ND|      Bismarck|      33|
|          Ohio|        OH|      Columbus|      34|
|      Oklahoma|        OK| Oklahoma City|      35|
|        Oregon|        OR|         Salem|      36|
|  Pennsylvania|        PA|    Harrisburg|      37|
|  Rhode Island|        RI|    Providence|      38|
|South Carolina|        SC|      Columbia|      39|
|  South Dakota|        SD|        Pierre|      40|
|     Tennessee|        TN|     Nashville|      41|
|         Texas|        TX|        Austin|      42|
|          Utah|        UT|Salt Lake City|      43|
|       Vermont|        VT|    Montpelier|      44|
|      Virginia|        VA|      Richmond|      45|
|    Washington|        WA|       Olympia|      46|
| West Virginia|        WV|    Charleston|      47|
|     Wisconsin|        WI|       Madison|      48|
|       Wyoming|        WY|      Cheyenne|      49|
+--------------+----------+--------------+--------+

monotonically_increasing_id – two partition dataframe

scala> df_usa2.withColumn("state_id",monotonically_increasing_id).show(50)
+--------------+----------+--------------+----------+
|    state_name|state_abbr| state_capital|  state_id|
+--------------+----------+--------------+----------+
|       Alabama|        AL|    Montgomery|         0|
|       Arizona|        AZ|       Phoenix|         1|
|    California|        CA|    Sacramento|         2|
|   Connecticut|        CT|      Hartford|         3|
|       Florida|        FL|   Tallahassee|         4|
|        Hawaii|        HI|      Honolulu|         5|
|      Illinois|        IL|   Springfield|         6|
|          Iowa|        IA|    Des Moines|         7|
|      Kentucky|        KY|     Frankfort|         8|
|         Maine|        ME|       Augusta|         9|
| Massachusetts|        MA|        Boston|        10|
|     Minnesota|        MN|      St. Paul|        11|
|      Missouri|        MO|Jefferson City|        12|
|       Montana|        MT|        Helena|        13|
|        Nevada|        NV|   Carson City|        14|
|    New Jersey|        NJ|       Trenton|        15|
|      New York|        NY|        Albany|        16|
|  North Dakota|        ND|      Bismarck|        17|
|      Oklahoma|        OK| Oklahoma City|        18|
|  Pennsylvania|        PA|    Harrisburg|        19|
|South Carolina|        SC|      Columbia|        20|
|     Tennessee|        TN|     Nashville|        21|
|          Utah|        UT|Salt Lake City|        22|
|      Virginia|        VA|      Richmond|        23|
| West Virginia|        WV|    Charleston|        24|
|       Wyoming|        WY|      Cheyenne|        25|
|        Alaska|        AK|        Juneau|8589934592|
|      Arkansas|        AR|   Little Rock|8589934593|
|      Colorado|        CO|        Denver|8589934594|
|      Delaware|        DE|         Dover|8589934595|
|       Georgia|        GA|       Atlanta|8589934596|
|         Idaho|        ID|         Boise|8589934597|
|       Indiana|        IN|  Indianapolis|8589934598|
|        Kansas|        KS|        Topeka|8589934599|
|     Louisiana|        LA|   Baton Rouge|8589934600|
|      Maryland|        MD|     Annapolis|8589934601|
|      Michigan|        MI|       Lansing|8589934602|
|   Mississippi|        MS|       Jackson|8589934603|
|      Nebraska|        NE|       Lincoln|8589934604|
| New Hampshire|        NH|       Concord|8589934605|
|    New Mexico|        NM|      Santa Fe|8589934606|
|North Carolina|        NC|       Raleigh|8589934607|
|          Ohio|        OH|      Columbus|8589934608|
|        Oregon|        OR|         Salem|8589934609|
|  Rhode Island|        RI|    Providence|8589934610|
|  South Dakota|        SD|        Pierre|8589934611|
|         Texas|        TX|        Austin|8589934612|
|       Vermont|        VT|    Montpelier|8589934613|
|    Washington|        WA|       Olympia|8589934614|
|     Wisconsin|        WI|       Madison|8589934615|
+--------------+----------+--------------+----------+

monotonically_increasing_id – three partition dataframe

scala> df_usa3.withColumn("state_id",monotonically_increasing_id).show(50)
+--------------+----------+--------------+-----------+
|    state_name|state_abbr| state_capital|   state_id|
+--------------+----------+--------------+-----------+
|       Arizona|        AZ|       Phoenix|          0|
|      Colorado|        CO|        Denver|          1|
|       Florida|        FL|   Tallahassee|          2|
|         Idaho|        ID|         Boise|          3|
|          Iowa|        IA|    Des Moines|          4|
|     Louisiana|        LA|   Baton Rouge|          5|
| Massachusetts|        MA|        Boston|          6|
|   Mississippi|        MS|       Jackson|          7|
|        Nevada|        NV|   Carson City|          8|
|    New Mexico|        NM|      Santa Fe|          9|
|  North Dakota|        ND|      Bismarck|         10|
|        Oregon|        OR|         Salem|         11|
|South Carolina|        SC|      Columbia|         12|
|         Texas|        TX|        Austin|         13|
|      Virginia|        VA|      Richmond|         14|
|     Wisconsin|        WI|       Madison|         15|
|       Alabama|        AL|    Montgomery| 8589934592|
|      Arkansas|        AR|   Little Rock| 8589934593|
|   Connecticut|        CT|      Hartford| 8589934594|
|       Georgia|        GA|       Atlanta| 8589934595|
|      Illinois|        IL|   Springfield| 8589934596|
|        Kansas|        KS|        Topeka| 8589934597|
|         Maine|        ME|       Augusta| 8589934598|
|      Michigan|        MI|       Lansing| 8589934599|
|      Missouri|        MO|Jefferson City| 8589934600|
|       Montana|        MT|        Helena| 8589934601|
| New Hampshire|        NH|       Concord| 8589934602|
|      New York|        NY|        Albany| 8589934603|
|          Ohio|        OH|      Columbus| 8589934604|
|  Pennsylvania|        PA|    Harrisburg| 8589934605|
|  South Dakota|        SD|        Pierre| 8589934606|
|          Utah|        UT|Salt Lake City| 8589934607|
|    Washington|        WA|       Olympia| 8589934608|
|       Wyoming|        WY|      Cheyenne| 8589934609|
|        Alaska|        AK|        Juneau|17179869184|
|    California|        CA|    Sacramento|17179869185|
|      Delaware|        DE|         Dover|17179869186|
|        Hawaii|        HI|      Honolulu|17179869187|
|       Indiana|        IN|  Indianapolis|17179869188|
|      Kentucky|        KY|     Frankfort|17179869189|
|      Maryland|        MD|     Annapolis|17179869190|
|     Minnesota|        MN|      St. Paul|17179869191|
|      Nebraska|        NE|       Lincoln|17179869192|
|    New Jersey|        NJ|       Trenton|17179869193|
|North Carolina|        NC|       Raleigh|17179869194|
|      Oklahoma|        OK| Oklahoma City|17179869195|
|  Rhode Island|        RI|    Providence|17179869196|
|     Tennessee|        TN|     Nashville|17179869197|
|       Vermont|        VT|    Montpelier|17179869198|
| West Virginia|        WV|    Charleston|17179869199|
+--------------+----------+--------------+-----------+

Important points for monotonically_increasing_id

  1. For dataframe with 1 partition, all numbers are generated in sequence without any gaps
  2. For dataframe with 2 partition, all numbers are generated in sequence without any gaps for each partition with definite offset.
  3. For dataframe with 3 partition, all numbers are generated in sequence without any gaps for each partition with definite offset.
  4. There is significant difference between generated numbers when partition is changed i.e. offset is a big number.
  5. The 3 partitions are created with almost same number of rows in each partition. The exchange of data happens using RoundRobinPartitioning technique which takes care of even distribution of data across partitions.
  6. The offset is 2 power 33 i.e. 8589934592. So for every partition the offset will be big number * partition number. Like 1st partition starts from 85899345920=0 ; 2nd partition starts from: 85899345921 = 8589934592; 3rd partition starts from : 8589934592*2=17179869184 and so on.
  7. We can also conclude from above point that this method assumes that a partition will not have more than 8589934592 records i.e. ~8 Bn records in a single partition.
  8. Moving all data into a single partition can result in memory issues if volume is too high.
Conclusion: If you just want incremental numbers then you can use monotonically_increasing_id. However if the numbers are required to be consecutive, then you can use zipWithIndex.

Leave a Reply