ZipWithIndex is used to generate consecutive numbers for given dataset. zipWithIndex can generate consecutive numbers or sequence numbers without any gap for the given dataset. One of the most common operation in any DATA Analytics environment is to generate sequences. There are multiple ways of generating SEQUENCE numbers however I find zipWithIndex as the best one in terms of simplicity and performance combined.
Below is the detailed code which shall help in generating surrogate keys/natural keys/sequence numbers.
Create a dataframe
df_0= sqlContext.sql("select pres_name,pres_dob,pres_bp,pres_bs,pres_in,pres_out from usa_prez") df_0.printSchema()
df_0.printSchema() root |-- pres_name: string (nullable = true) |-- pres_dob: date (nullable = true) |-- pres_bp: string (nullable = true) |-- pres_bs: string (nullable = true) |-- pres_in: date (nullable = true) |-- pres_out: date (nullable = true)
Add a new column for sequence
In this example we are using INTEGER, if you want bigger number just change lit(1) to lit(long(1)). Also import lit method from sql package
from pyspark.sql.functions import lit df_0_schema = df_0.withColumn("pres_id", lit(1)) df_0_schema.printSchema()
root |-- pres_name: string (nullable = true) |-- pres_dob: date (nullable = true) |-- pres_bp: string (nullable = true) |-- pres_bs: string (nullable = true) |-- pres_in: date (nullable = true) |-- pres_out: date (nullable = true) |-- pres_id: integer (nullable = false)
Apply zipWithIndex to rdd from dataframe
zipWithIndex is method for Resilient Distributed Dataset (RDD). So we have to convert existing Dataframe into RDD. Since zipWithIndex start indices value from 0 and we want to start from 1, we have added 1 to “[rowId+1]”. Replace 1 with your offset value if any. Also we have to add newly generated number to existing row list. Hence used lambda function.
#if using python 2 rdd_1 = df_0.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1])) #if using python 3 rdd_1 = df_0.rdd.zipWithIndex().map(lambda row,rowId: ([rowId +1] + list(row)))
Convert rdd back to dataframe
Since Dataframe has schema information associated with it we will impose a structure on our data calculated in Step 2.
df_1 = sqlContext.createDataFrame(rdd_1, schema=df_0_schema.schema) df_1.printSchema()
Check the data in dataframe
You can convert it into temp table if you want to use sqlContext
df_1.show(50) df_1.registerTempTable("usa_prez_tmp") sqlContext.sql("select * from usa_prez_tmp").show(50)
Keeping everything together; below is the complete code:
from pyspark.sql.functions import lit df_0= sqlContext.sql("select pres_name,pres_dob,pres_bp,pres_bs,pres_in,pres_out from usa_prez") df_0.printSchema() df_0_schema = df_0.withColumn("pres_id", lit(1)) df_0_schema.printSchema() #rdd_1 = df_0.rdd.zipWithIndex().map(lambda row,rowId: ([rowId +1] + list(row))) rdd_1 = df_0.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1])) df_1 = sqlContext.createDataFrame(rdd_1, schema=df_0_schema.schema) df_1.printSchema() df_1.show(50) df_1.registerTempTable("usa_prez_tmp") sqlContext.sql("select * from usa_prez_tmp").show(10)
Output of above commands:
+--------------------+----------+-------------------+--------------------+----------+----------+-------+ | pres_name| pres_dob| pres_bp| pres_bs| pres_in| pres_out|pres_id| +--------------------+----------+-------------------+--------------------+----------+----------+-------+ | George Washington|1732-02-22|Westmoreland County| Virginia|1789-04-30|1797-03-04| 1| | John Adams|1735-10-30| Braintree| Massachusetts|1797-03-04|1801-03-04| 2| | Thomas Jefferson|1743-04-13| Shadwell| Virginia|1801-03-04|1809-03-04| 3| | James Madison|1751-03-16| Port Conway| Virginia|1809-03-04|1817-03-04| 4| | James Monroe|1758-04-28| Monroe Hall| Virginia|1817-03-04|1825-03-04| 5| | John Quincy Adams|1767-07-11| Braintree| Massachusetts|1825-03-04|1829-03-04| 6| | Andrew Jackson|1767-03-15| Waxhaws Region|South/North Carolina|1829-03-04|1837-03-04| 7| | Martin Van Buren|1782-12-05| Kinderhook| New York|1837-03-04|1841-03-04| 8| | William Henry Har…|1773-02-09|Charles City County| Virginia|1841-03-04|1841-04-04| 9| | John Tyler|1790-03-29|Charles City County| Virginia|1841-04-04|1845-03-04| 10| +--------------------+----------+-------------------+--------------------+----------+----------+-------+
Check the last column “pres_id”. It is sequence number generated.
Conclusion: If you want consecutive sequence number then you can use zipwithindex in spark. However if you just want incremental numbers then monotonically_increasing_id is preferred option.
This line throws an error:
rdd_1 = df_0.rdd.zipWithIndex().map(lambda (row,rowId): ( list(row) + [rowId+1]))
It says map is being called with only one param.
Hi Jim
If you are using Python3 , then syntax has changed.
Please try below:
rdd_1 = df_0.rdd.zipWithIndex().map(lambda row,rowId: ([rowId +1] + list(row)))
—
Raj
Nice representation….
one question overhere…. how can we pass a parameter to generate the sequence from that particular point. is there any way to achieve.
Also this code is belongs to python2.7… is there any way to run through python 3.6?