PySpark

PySpark-How to Generate MD5 of entire row with columns

I was recently working on a project to migrate some records from on-premises data warehouse to S3. The requirement was also to run MD5 check on each row between Source & Target to gain confidence if the data moved is accurate. In this post I will share the method in which MD5 for each row in dataframe can be generated.

I will create a dummy dataframe with 3 columns and 4 rows. Now my requirement is to generate MD5 for each row. Below is the sample code extract in PySpark.

columns = ["emp_id","emp_name","emp_dept"]
data = [("1", "Mark","Admin"), ("2", "Roger","HR"), ("3","Wanda", "Technology"),("4","Vision","Data Analytics")]
rdd = spark.sparkContext.parallelize(data)
df_employee = rdd.toDF(columns)
df_employee.printSchema()
df_employee.show(truncate=False)

col_list=[]
for i in df_employee.columns:
    col_list.append(i)

print (col_list)

from pyspark.sql.functions import md5, concat_ws
df_employee = df_employee.withColumn("md5", md5(concat_ws("", *col_list)))
df_employee.show(truncate=False)

Output will look like below:

+------+--------+--------------+--------------------------------+
|emp_id|emp_name|emp_dept      |md5                             |
+------+--------+--------------+--------------------------------+
|1     |Mark    |Admin         |a9bd622a5a7c17233df735117f27d30b|
|2     |Roger   |HR            |9c8cf64baa414f4327340cfcaf5da4b4|
|3     |Wanda   |Technology    |ff31440088895102bfb805c5a57b21fe|
|4     |Vision  |Data Analytics|ba6788d1e13c6e98cb8f8814ff533828|
+------+--------+--------------+--------------------------------+

You can also use hash-128, hash-256 to generate unique value for each.

Watch the below video to see the tutorial for this post.

2 thoughts on “PySpark-How to Generate MD5 of entire row with columns”

    1. Tried below with DATE & NULL values:

      columns = ["emp_id","emp_name","emp_dept","emp_dob"]
      data = [("1", "Mark","Admin",""), ("2", "Roger","HR","1980-02-23"), ("3","Wanda", "Technology","1988-05-20"),("4","Vision","Data Analytics","1990-10-02")]
      rdd = spark.sparkContext.parallelize(data)
      df_employee = rdd.toDF(columns)
      df_employee.printSchema()
      df_employee.show(truncate=False)
      
      col_list=[]
      for i in df_employee.columns:
          col_list.append(i)
      
      print (col_list)
      
      from pyspark.sql.functions import md5, concat_ws,col
      
      df_employee = df_employee.withColumn("emp_dob",col("emp_dob").cast("date"))
      df_employee.printSchema()
      
      df_employee = df_employee.withColumn("md5", md5(concat_ws("", *col_list)))
      df_employee.show(truncate=False)
      
      +------+--------+--------------+----------+--------------------------------+
      |emp_id|emp_name|emp_dept      |emp_dob   |md5                             |
      +------+--------+--------------+----------+--------------------------------+
      |1     |Mark    |Admin         |null      |36ed0a3cf767c290290f0ed5cd743afb|
      |2     |Roger   |HR            |1980-02-23|7b1cf73e3ba8a7dac9f68729d68064ac|
      |3     |Wanda   |Technology    |1988-05-20|c67f1f914156ad89d7bd214658e26e86|
      |4     |Vision  |Data Analytics|1990-10-02|900ba57f5c2021d24dd62f8f1bacdc92|
      +------+--------+--------------+----------+--------------------------------+
      

Leave a Reply