PySpark Cheat Sheet

Topics Covered

This cheat sheet covers PySpark related code snippets. Code snippets cover common PySpark operations and also some scenario based code. I am regularly adding more code snippets and you can also request for anything specific and I will try to add it quickly as well. Also will request you to add to comment section any code snippet you wish to share, we will add it to main list.

PySpark Code Snippets

PySpark – Create Dataframe from CSV in S3

df_taxi = spark.read.option("header","true").option("delimiter",",").option("inferschema","true").csv("s3://nyc-tlc/trip data/yellow_tripdata_2020-12.csv")

PySpark – Print Dataframe Schema

df_taxi.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

PySpark – Check rows count in dataframe

df_taxi.count()

1461897

PySpark – Print all column names in dataframe

df_taxi.columns

['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']

PySpark – Check datatype of all columns in dataframe

df_taxi.dtypes

[('VendorID', 'int'), ('tpep_pickup_datetime', 'timestamp'), ('tpep_dropoff_datetime', 'timestamp'), ('passenger_count', 'int'), ('trip_distance', 'double'), ('RatecodeID', 'int'), ('store_and_fwd_flag', 'string'), ('PULocationID', 'int'), ('DOLocationID', 'int'), ('payment_type', 'int'), ('fare_amount', 'double'), ('extra', 'double'), ('mta_tax', 'double'), ('tip_amount', 'double'), ('tolls_amount', 'double'), ('improvement_surcharge', 'double'), ('total_amount', 'double'), ('congestion_surcharge', 'double')]

PySpark – Pick 5 rows from dataframe and return list

df_taxi.take(5)

[Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2020, 12, 1, 0, 7, 13), tpep_dropoff_datetime=datetime.datetime(2020, 12, 1, 0, 18, 12), passenger_count=1, trip_distance=7.6, RatecodeID=1, store_and_fwd_flag='N', PULocationID=138, DOLocationID=263, payment_type=1, fare_amount=21.5, extra=3.0, mta_tax=0.5, tip_amount=2.5, tolls_amount=6.12, improvement_surcharge=0.3, total_amount=33.92, congestion_surcharge=2.5), 
Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2020, 12, 1, 0, 41, 19), tpep_dropoff_datetime=datetime.datetime(2020, 12, 1, 0, 49, 45), passenger_count=1, trip_distance=1.6, RatecodeID=1, store_and_fwd_flag='N', PULocationID=140, DOLocationID=263, payment_type=1, fare_amount=8.0, extra=3.0, mta_tax=0.5, tip_amount=2.95, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=14.75, congestion_surcharge=2.5), 
Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2020, 12, 1, 0, 33, 40), tpep_dropoff_datetime=datetime.datetime(2020, 12, 1, 1, 0, 35), passenger_count=1, trip_distance=16.74, RatecodeID=2, store_and_fwd_flag='N', PULocationID=132, DOLocationID=164, payment_type=1, fare_amount=52.0, extra=0.0, mta_tax=0.5, tip_amount=2.5, tolls_amount=6.12, improvement_surcharge=0.3, total_amount=63.92, congestion_surcharge=2.5), 
Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2020, 12, 1, 0, 2, 15), tpep_dropoff_datetime=datetime.datetime(2020, 12, 1, 0, 13, 9), passenger_count=1, trip_distance=4.16, RatecodeID=1, store_and_fwd_flag='N', PULocationID=238, DOLocationID=48, payment_type=1, fare_amount=14.0, extra=0.5, mta_tax=0.5, tip_amount=1.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=18.8, congestion_surcharge=2.5), 
Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2020, 12, 1, 0, 37, 42), tpep_dropoff_datetime=datetime.datetime(2020, 12, 1, 0, 45, 11), passenger_count=1, trip_distance=2.22, RatecodeID=1, store_and_fwd_flag='N', PULocationID=238, DOLocationID=41, payment_type=2, fare_amount=8.5, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=9.8, congestion_surcharge=0.0)]

PySpark – Pick 5 rows from dataframe and return dataframe

df_taxi.limit(5).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2020-12-01 00:07:13|  2020-12-01 00:18:12|              1|          7.6|         1|                 N|         138|         263|           1|       21.5|  3.0|    0.5|       2.5|        6.12|                  0.3|       33.92|                 2.5|
|       1| 2020-12-01 00:41:19|  2020-12-01 00:49:45|              1|          1.6|         1|                 N|         140|         263|           1|        8.0|  3.0|    0.5|      2.95|         0.0|                  0.3|       14.75|                 2.5|
|       2| 2020-12-01 00:33:40|  2020-12-01 01:00:35|              1|        16.74|         2|                 N|         132|         164|           1|       52.0|  0.0|    0.5|       2.5|        6.12|                  0.3|       63.92|                 2.5|
|       2| 2020-12-01 00:02:15|  2020-12-01 00:13:09|              1|         4.16|         1|                 N|         238|          48|           1|       14.0|  0.5|    0.5|       1.0|         0.0|                  0.3|        18.8|                 2.5|
|       2| 2020-12-01 00:37:42|  2020-12-01 00:45:11|              1|         2.22|         1|                 N|         238|          41|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8|                 0.0|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+

PySpark – Show only 5 rows in output

Show columns using "show" in PySpark dataframe.

df_taxi.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2020-12-01 00:07:13|  2020-12-01 00:18:12|              1|          7.6|         1|                 N|         138|         263|           1|       21.5|  3.0|    0.5|       2.5|        6.12|                  0.3|       33.92|                 2.5|
|       1| 2020-12-01 00:41:19|  2020-12-01 00:49:45|              1|          1.6|         1|                 N|         140|         263|           1|        8.0|  3.0|    0.5|      2.95|         0.0|                  0.3|       14.75|                 2.5|
|       2| 2020-12-01 00:33:40|  2020-12-01 01:00:35|              1|        16.74|         2|                 N|         132|         164|           1|       52.0|  0.0|    0.5|       2.5|        6.12|                  0.3|       63.92|                 2.5|
|       2| 2020-12-01 00:02:15|  2020-12-01 00:13:09|              1|         4.16|         1|                 N|         238|          48|           1|       14.0|  0.5|    0.5|       1.0|         0.0|                  0.3|        18.8|                 2.5|
|       2| 2020-12-01 00:37:42|  2020-12-01 00:45:11|              1|         2.22|         1|                 N|         238|          41|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8|                 0.0|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
only showing top 5 rows

PySpark – Select column from dataframe

SELECT columns using "select" in PySpark dataframe which accepts column list as input argument

df_taxi.select("vendorid","tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance","total_amount").show(5)

+--------+--------------------+---------------------+---------------+-------------+------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------+
|       1| 2020-12-01 00:07:13|  2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1| 2020-12-01 00:41:19|  2020-12-01 00:49:45|              1|          1.6|       14.75|
|       2| 2020-12-01 00:33:40|  2020-12-01 01:00:35|              1|        16.74|       63.92|
|       2| 2020-12-01 00:02:15|  2020-12-01 00:13:09|              1|         4.16|        18.8|
|       2| 2020-12-01 00:37:42|  2020-12-01 00:45:11|              1|         2.22|         9.8|
+--------+--------------------+---------------------+---------------+-------------+------------+
only showing top 5 rows

PySpark – Rename columns in Dataframe

Rename columns using "withColumnRenamed" in PySpark dataframe.

df_taxi.withColumnRenamed("tpep_pickup_datetime","pickup_time").withColumnRenamed("tpep_dropoff_datetime","drop_time").select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").show(5)

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|
+--------+-------------------+-------------------+---------------+-------------+------------+
only showing top 5 rows

PySpark – Rename LIST of columns in Dataframe

Rename list of columns in Dataframe using LIST inside FOR loop. We will use Python ZIP function to merge two list and use that.

existing_cols = ["tpep_pickup_datetime","tpep_dropoff_datetime"]
new_cols = ["pickup_time","drop_time"]
print (set(zip(existing_cols,new_cols)))

{('tpep_pickup_datetime', 'pickup_time'), ('tpep_dropoff_datetime', 'drop_time')}


for i ,j in zip(existing_cols,new_cols):
    df_taxi = df_taxi.withColumnRenamed(i,j)

df_taxi.select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").show(5)

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|
+--------+-------------------+-------------------+---------------+-------------+------------+
only showing top 5 rows

PySpark – Describe Dataframe

Describe returns the statistics of the values for different columns in dataframe

df_taxi.describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+
|summary|          VendorID|   passenger_count|    trip_distance|        RatecodeID|store_and_fwd_flag|     PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|
+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+
|  count|           1362441|           1362441|          1461897|           1362441|           1362441|          1461897|           1461897|           1362441|           1461897|           1461897|            1461897|           1461897|            1461897|              1461897|           1461897|             1461897|
|   mean|1.6853148136322968|1.4193128362989664|6.070390177967453|  1.02891501356756|              null|166.1230250831625|162.04916009814644|1.2890679302810177|12.213858596053779| 0.932831027083303| 0.4924785740719079|1.9610265292287639|0.21780083685779542|  0.29669258504199447|17.598124874775678|  2.1472160829388116|
| stddev|0.4643905448400174|1.0633235169199586|886.5289264178996|0.5479180986778657|              null|67.83917050363657| 71.80259558721558|0.4967290209582704| 329.7733338785015|1.2030748172566303|0.07956615727017227| 2.637018116691093|  1.287813125568131|  0.04391618443743257| 329.8461373300217|  0.9017798324971192|
|    min|                 1|                 0|              0.0|                 1|                 N|                1|                 1|                 1|            -500.0|              -4.5|               -0.5|            -22.55|             -29.62|                 -0.3|            -502.8|                -2.5|
|    max|                 2|                 9|        350914.89|                99|                 Y|              265|               265|                 4|         398464.88|              7.43|                3.3|           1393.56|             102.25|                  0.3|          398467.7|                 2.5|
+-------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+

PySpark – Check number of partitions in Dataframe

We will use getNumPartitions() attribute of rdd object to get number of partitions for dataframe.

df_taxi.rdd.getNumPartitions()

2

PySpark – Write Dataframe to CSV

There are 2 output files as the dataframe had 2 partitions only.

df_taxi.write.format("csv").option("header","true").save("hdfs:///var/data/taxi/")

#check the output file using shell command
hdfs dfs -ls -h hdfs:///var/data/taxi/

Found 3 items
-rw-r--r--   1 hadoop hdfsadmingroup          0 2021-06-30 11:36 hdfs:///var/data/taxi/_SUCCESS
-rw-r--r--   1 hadoop hdfsadmingroup     76.3 M 2021-06-30 11:36 hdfs:///var/data/taxi/part-00000-cbee1ee9-7ef6-43a1-9e2f-8542f2e6a8a1-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     72.8 M 2021-06-30 11:36 hdfs:///var/data/taxi/part-00001-cbee1ee9-7ef6-43a1-9e2f-8542f2e6a8a1-c000.csv

PySpark – Write Dataframe to CSV in single part

We will use coalesce to reduce the number of partitions to 1 before writing it into csv.

df_taxi.coalesce(1).write.format("csv").option("header","true").save("hdfs:///var/data/taxi1/")

#check the output file using shell command
hdfs dfs -ls -h hdfs:///var/data/taxi1/

Found 2 items
-rw-r--r--   1 hadoop hdfsadmingroup          0 2021-06-30 11:43 hdfs:///var/data/taxi1/_SUCCESS
-rw-r--r--   1 hadoop hdfsadmingroup    149.1 M 2021-06-30 11:43 hdfs:///var/data/taxi1/part-00000-ee55afc0-10a6-4242-ba85-70aca302360a-c000.csv

PySpark – Write Dataframe to CSV into multiple parts

We will use "repartition" to increase the number of partitions to 5 before writing into csv.

df_taxi.repartition(5).write.format("csv").option("header","true").save("hdfs:///var/data/taxi2/")

#check the output file using shell command
hdfs dfs -ls -h hdfs:///var/data/taxi2/

Found 6 items
-rw-r--r--   1 hadoop hdfsadmingroup          0 2021-06-30 11:45 hdfs:///var/data/taxi2/_SUCCESS
-rw-r--r--   1 hadoop hdfsadmingroup     29.8 M 2021-06-30 11:45 hdfs:///var/data/taxi2/part-00000-4ba901d0-1cd5-4f24-be59-3b798c566a9b-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     29.8 M 2021-06-30 11:45 hdfs:///var/data/taxi2/part-00001-4ba901d0-1cd5-4f24-be59-3b798c566a9b-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     29.8 M 2021-06-30 11:45 hdfs:///var/data/taxi2/part-00002-4ba901d0-1cd5-4f24-be59-3b798c566a9b-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     29.8 M 2021-06-30 11:45 hdfs:///var/data/taxi2/part-00003-4ba901d0-1cd5-4f24-be59-3b798c566a9b-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     29.8 M 2021-06-30 11:45 hdfs:///var/data/taxi2/part-00004-4ba901d0-1cd5-4f24-be59-3b798c566a9b-c000.csv

PySpark – Write Dataframe to CSV by repartitioning on column basis

We can pass column name on the basis of which dataframe data is repartitioned and will be created into separate file parts. It may create few empty files as default value for shuffle partition is 200. The data has 3 distinct values for "VendorID" column hence 3 files parts with data in the output.

from pyspark.sql.functions import col
df_taxi.repartition(col("VendorID")).write.format("csv").option("header","true").save("hdfs:///var/data/taxi3/")

#check the output file using shell command
hdfs dfs -ls -h hdfs:///var/data/taxi3/
  
Found 5 items
-rw-r--r--   1 hadoop hdfsadmingroup          0 2021-06-30 11:48 hdfs:///var/data/taxi3/_SUCCESS
-rw-r--r--   1 hadoop hdfsadmingroup          0 2021-06-30 11:48 hdfs:///var/data/taxi3/part-00000-6fcbbe85-6cd2-4522-9a44-8c1d87041f8d-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     10.8 M 2021-06-30 11:48 hdfs:///var/data/taxi3/part-00042-6fcbbe85-6cd2-4522-9a44-8c1d87041f8d-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     43.2 M 2021-06-30 11:48 hdfs:///var/data/taxi3/part-00043-6fcbbe85-6cd2-4522-9a44-8c1d87041f8d-c000.csv
-rw-r--r--   1 hadoop hdfsadmingroup     95.1 M 2021-06-30 11:48 hdfs:///var/data/taxi3/part-00174-6fcbbe85-6cd2-4522-9a44-8c1d87041f8d-c000.csv

PySpark – Add new column to the dataframe with static value

We will use "withColumn" to add new column to the existing dataframe. The new column will have a static value using "lit" method.

from pyspark.sql.functions import lit df_taxi.withColumn("bonus",lit(1)).select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount","bonus").show(5)

+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|bonus|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|    1|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|    1|
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|    1|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|    1|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|    1|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
only showing top 5 rows

PySpark – Add new column to the dataframe with derived value

df_taxi.withColumn("bonus",col("total_amount")*0.1).select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount","bonus").show(5)

+--------+-------------------+-------------------+---------------+-------------+------------+------------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|             bonus|
+--------+-------------------+-------------------+---------------+-------------+------------+------------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|3.3920000000000003|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|             1.475|
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|             6.392|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|1.8800000000000001|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|0.9800000000000001|
+--------+-------------------+-------------------+---------------+-------------+------------+------------------+

PySpark – Cast column to different datatype in dataframe

We will convert column to another datatype using "cast" function.

df_taxi.withColumn("bonus",(col("total_amount")*0.1).cast("integer")).select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount","bonus").show(5)

+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|bonus|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|    3|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|    1|
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|    6|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|    1|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|    0|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
only showing top 5 rows

PySpark – Calculate count on aggregate data in dataframe

df_taxi.groupBy("VendorID").count().show()

+--------+------+
|VendorID| count|
+--------+------+
|    null| 99456|
|       1|428740|
|       2|933701|
+--------+------+

PySpark – Calculate average of columns on aggregate data

df_taxi.groupBy("VendorID").avg("trip_distance","total_amount").show()

+--------+------------------+------------------+
|VendorID|avg(trip_distance)| avg(total_amount)|
+--------+------------------+------------------+
|    null| 55.74605383285073| 31.10511090331231|
|       1|  2.37059429957549| 17.32007029899172|
|       2|2.4779207262282674|16.287065248877628|
+--------+------------------+------------------+

PySpark – Calculate sum of columns on aggregate data

df_taxi.groupBy("VendorID").sum("trip_distance","total_amount").show()

+--------+------------------+-------------------+
|VendorID|sum(trip_distance)|  sum(total_amount)|
+--------+------------------+-------------------+
|    null| 5544279.530000002|  3093589.909999829|
|       1|1016368.5999999954|   7425806.93998971|
|       2|2313637.0600000597|1.520724910994229E7|
+--------+------------------+-------------------+

PySpark – Calculate maximum of columns on aggregate data

df_taxi.groupBy("VendorID").max("trip_distance","total_amount").show()

+--------+------------------+-----------------+
|VendorID|max(trip_distance)|max(total_amount)|
+--------+------------------+-----------------+
|    null|         350914.89|           158.41|
|       1|             283.7|         398467.7|
|       2|            407.78|          8361.36|
+--------+------------------+-----------------+

PySpark – Calculate minimum of columns on aggregate data

df_taxi.groupBy("VendorID").min("trip_distance","total_amount").show()

+--------+------------------+-----------------+
|VendorID|min(trip_distance)|min(total_amount)|
+--------+------------------+-----------------+
|    null|               0.0|           -56.32|
|       1|               0.0|              0.0|
|       2|               0.0|           -502.8|
+--------+------------------+-----------------+

PySpark – Merge two Dataframe data into one

We will use UNION ALL to merge data from two dataframe into one.

df_taxi1 = df_taxi.select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").filter("VendorID==1").limit(5)

df_taxi1.show()

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|
|       1|2020-12-01 00:27:47|2020-12-01 00:45:40|              0|          8.4|       40.92|
|       1|2020-12-01 00:08:15|2020-12-01 00:16:04|              2|          2.7|       15.95|
|       1|2020-12-01 00:14:34|2020-12-01 00:31:04|              1|          7.6|       32.15|
+--------+-------------------+-------------------+---------------+-------------+------------+


df_taxi2 = df_taxi.select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").filter("VendorID==2").limit(5)

df_taxi2.show()

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|
|       2|2020-12-01 00:40:47|2020-12-01 00:57:03|              1|         6.44|       24.96|
|       2|2020-12-01 00:01:42|2020-12-01 00:06:06|              1|         0.99|       11.16|
+--------+-------------------+-------------------+---------------+-------------+------------+

# UNION ALL to MERGE 2 dataframe data
df_taxi1.unionAll(df_taxi2).show()

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|
|       1|2020-12-01 00:27:47|2020-12-01 00:45:40|              0|          8.4|       40.92|
|       1|2020-12-01 00:08:15|2020-12-01 00:16:04|              2|          2.7|       15.95|
|       1|2020-12-01 00:14:34|2020-12-01 00:31:04|              1|          7.6|       32.15|
|       2|2020-12-01 00:33:40|2020-12-01 01:00:35|              1|        16.74|       63.92|
|       2|2020-12-01 00:02:15|2020-12-01 00:13:09|              1|         4.16|        18.8|
|       2|2020-12-01 00:37:42|2020-12-01 00:45:11|              1|         2.22|         9.8|
|       2|2020-12-01 00:40:47|2020-12-01 00:57:03|              1|         6.44|       24.96|
|       2|2020-12-01 00:01:42|2020-12-01 00:06:06|              1|         0.99|       11.16|
+--------+-------------------+-------------------+---------------+-------------+------------+

PySpark – Inner join two dataframe

df_taxi1 = df_taxi.select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").filter("VendorID==1").limit(10)
df_taxi1.show()

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|
|       1|2020-12-01 00:27:47|2020-12-01 00:45:40|              0|          8.4|       40.92|
|       1|2020-12-01 00:08:15|2020-12-01 00:16:04|              2|          2.7|       15.95|
|       1|2020-12-01 00:14:34|2020-12-01 00:31:04|              1|          7.6|       32.15|
|       1|2020-12-01 00:11:02|2020-12-01 00:17:34|              1|          1.7|        11.3|
|       1|2020-12-01 00:54:55|2020-12-01 00:57:09|              1|          0.5|         7.8|
|       1|2020-12-01 00:11:22|2020-12-01 00:40:36|              1|         21.0|       71.85|
|       1|2020-12-01 00:53:58|2020-12-01 00:54:06|              1|          0.0|         3.8|
|       1|2020-12-01 00:52:58|2020-12-01 00:54:28|              1|          0.7|         7.8|
+--------+-------------------+-------------------+---------------+-------------+------------+

df_taxi2 = df_taxi.select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").filter("VendorID==1").limit(5)
df_taxi2.show()

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|       1|2020-12-01 00:07:13|2020-12-01 00:18:12|              1|          7.6|       33.92|
|       1|2020-12-01 00:41:19|2020-12-01 00:49:45|              1|          1.6|       14.75|
|       1|2020-12-01 00:27:47|2020-12-01 00:45:40|              0|          8.4|       40.92|
|       1|2020-12-01 00:08:15|2020-12-01 00:16:04|              2|          2.7|       15.95|
|       1|2020-12-01 00:14:34|2020-12-01 00:31:04|              1|          7.6|       32.15|
+--------+-------------------+-------------------+---------------+-------------+------------+

# INNER JOIN
df_taxi1.join(df_taxi2, 'pickup_time', how='inner').show()

+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|        pickup_time|vendorid|          drop_time|passenger_count|trip_distance|total_amount|vendorid|          drop_time|passenger_count|trip_distance|total_amount|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|2020-12-01 00:07:13|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|
|2020-12-01 00:41:19|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|
|2020-12-01 00:27:47|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|
|2020-12-01 00:08:15|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|
|2020-12-01 00:14:34|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+

PySpark – LEFT join two dataframe

>>> df_taxi1.join(df_taxi2, 'pickup_time', how='left').show()

+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|        pickup_time|vendorid|          drop_time|passenger_count|trip_distance|total_amount|vendorid|          drop_time|passenger_count|trip_distance|total_amount|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|2020-12-01 00:07:13|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|
|2020-12-01 00:41:19|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|
|2020-12-01 00:27:47|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|
|2020-12-01 00:08:15|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|
|2020-12-01 00:14:34|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|
|2020-12-01 00:11:02|       1|2020-12-01 00:17:34|              1|          1.7|        11.3|    null|               null|           null|         null|        null|
|2020-12-01 00:54:55|       1|2020-12-01 00:57:09|              1|          0.5|         7.8|    null|               null|           null|         null|        null|
|2020-12-01 00:11:22|       1|2020-12-01 00:40:36|              1|         21.0|       71.85|    null|               null|           null|         null|        null|
|2020-12-01 00:53:58|       1|2020-12-01 00:54:06|              1|          0.0|         3.8|    null|               null|           null|         null|        null|
|2020-12-01 00:52:58|       1|2020-12-01 00:54:28|              1|          0.7|         7.8|    null|               null|           null|         null|        null|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+

PySpark – RIGHT join two dataframe

>>> df_taxi1.join(df_taxi2, 'pickup_time', how='right').show()

+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|        pickup_time|vendorid|          drop_time|passenger_count|trip_distance|total_amount|vendorid|          drop_time|passenger_count|trip_distance|total_amount|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|2020-12-01 00:07:13|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|
|2020-12-01 00:41:19|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|
|2020-12-01 00:27:47|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|
|2020-12-01 00:08:15|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|
|2020-12-01 00:14:34|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+

PySpark – FULL OUTER join two dataframe

>>> df_taxi1.join(df_taxi2, 'pickup_time', how='outer').show()

+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|        pickup_time|vendorid|          drop_time|passenger_count|trip_distance|total_amount|vendorid|          drop_time|passenger_count|trip_distance|total_amount|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+
|2020-12-01 00:07:13|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|       1|2020-12-01 00:18:12|              1|          7.6|       33.92|
|2020-12-01 00:08:15|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|       1|2020-12-01 00:16:04|              2|          2.7|       15.95|
|2020-12-01 00:11:02|       1|2020-12-01 00:17:34|              1|          1.7|        11.3|    null|               null|           null|         null|        null|
|2020-12-01 00:11:22|       1|2020-12-01 00:40:36|              1|         21.0|       71.85|    null|               null|           null|         null|        null|
|2020-12-01 00:14:34|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|       1|2020-12-01 00:31:04|              1|          7.6|       32.15|
|2020-12-01 00:27:47|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|       1|2020-12-01 00:45:40|              0|          8.4|       40.92|
|2020-12-01 00:41:19|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|       1|2020-12-01 00:49:45|              1|          1.6|       14.75|
|2020-12-01 00:52:58|       1|2020-12-01 00:54:28|              1|          0.7|         7.8|    null|               null|           null|         null|        null|
|2020-12-01 00:53:58|       1|2020-12-01 00:54:06|              1|          0.0|         3.8|    null|               null|           null|         null|        null|
|2020-12-01 00:54:55|       1|2020-12-01 00:57:09|              1|          0.5|         7.8|    null|               null|           null|         null|        null|
+-------------------+--------+-------------------+---------------+-------------+------------+--------+-------------------+---------------+-------------+------------+

PySpark – Check NULL values in column in dataframe

df_taxi.filter("VendorID is NULL").select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount").show(5)

+--------+-------------------+-------------------+---------------+-------------+------------+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|
+--------+-------------------+-------------------+---------------+-------------+------------+
|    null|2020-12-01 00:04:00|2020-12-01 00:07:00|           null|         0.51|       59.84|
|    null|2020-12-01 00:24:00|2020-12-01 00:44:00|           null|         5.51|       30.08|
|    null|2020-12-01 00:46:00|2020-12-01 01:02:00|           null|         8.07|        43.0|
|    null|2020-12-01 00:06:00|2020-12-01 00:27:00|           null|         3.83|        25.5|
|    null|2020-12-01 00:27:13|2020-12-01 00:41:18|           null|         8.53|       26.22|
+--------+-------------------+-------------------+---------------+-------------+------------+
only showing top 5 rows

PySpark – DROP any row with value as NULL for any given columns

We will use dropna function to drop any row with value NULL in it for given column.

df_taxi.count()
1461897

df_taxi.dropna(how='any',subset=["VendorID","passenger_count"]).count()
1362441

PySpark – DROP any row with value as NULL for all given columns

df_taxi.count()
1461897

df_taxi.dropna(how='all',subset=["VendorID","passenger_count"]).count()
1362441

PySpark – Replace NULL value with given value for given column

We will use fillna function to replace NULL value for any given column in the dataframe with default value.

df_taxi.filter("VendorID is null").count()
99456

df_taxi.fillna(0,subset=["VendorID"]).filter("VendorID is null").count()
0

PySpark – Window function row number

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

df_taxi.withColumn("rowid",row_number().over(Window.partitionBy("VendorID").orderBy(col("total_amount").desc()))).filter("rowid <4").select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount","rowid").show(10)

+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|rowid|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|    null|2020-12-28 08:16:00|2020-12-28 09:33:00|           null|        60.93|      158.41|    1|
|    null|2020-12-18 09:12:00|2020-12-18 10:32:00|           null|        40.09|      150.75|    2|
|    null|2020-12-22 14:30:00|2020-12-22 15:25:00|           null|        34.28|      128.89|    3|
|       1|2020-12-26 13:39:29|2020-12-26 13:46:24|              2|          0.0|    398467.7|    1|
|       1|2020-12-08 19:01:07|2020-12-09 00:30:36|              0|        265.1|       921.2|    2|
|       1|2020-12-18 19:42:24|2020-12-18 19:42:57|              1|          0.0|      656.15|    3|
|       2|2020-12-20 23:01:34|2020-12-20 23:14:21|              1|         4.51|     8361.36|    1|
|       2|2020-12-11 13:09:01|2020-12-11 13:09:10|              1|          0.0|       617.8|    2|
|       2|2020-12-11 13:10:01|2020-12-11 13:10:10|              1|          0.0|       612.8|    3|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+

PySpark – Window function rank

from pyspark.sql.window import Window
from pyspark.sql.functions import rank
 df_taxi.withColumn("rowid",rank().over(Window.partitionBy("VendorID").orderBy(col("total_amount").desc()))).filter("rowid=2").select("vendorid","pickup_time","drop_time","passenger_count","trip_distance","total_amount","rowid").show(10)
  
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|vendorid|        pickup_time|          drop_time|passenger_count|trip_distance|total_amount|rowid|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+
|    null|2020-12-18 09:12:00|2020-12-18 10:32:00|           null|        40.09|      150.75|    2|
|       1|2020-12-08 19:01:07|2020-12-09 00:30:36|              0|        265.1|       921.2|    2|
|       2|2020-12-11 13:09:01|2020-12-11 13:09:10|              1|          0.0|       617.8|    2|
+--------+-------------------+-------------------+---------------+-------------+------------+-----+

PySpark – Pivot to convert rows into columns

We can convert rows into columns using Pivot function in PySpark. In this example we will convert row value for "passenger_count" column into separate columns and will calculate "total_amount" sum for each column.

df_taxi.select("VendorID","passenger_count","total_amount").show(5)

+--------+---------------+------------+
|VendorID|passenger_count|total_amount|
+--------+---------------+------------+
|       1|              1|       33.92|
|       1|              1|       14.75|
|       2|              1|       63.92|
|       2|              1|        18.8|
|       2|              1|         9.8|
+--------+---------------+------------+
only showing top 5 rows

df_taxi.filter("VendorID in (1,2)").groupBy("VendorID").pivot("passenger_count",values=['1','2','3','4','5','6']).sum("total_amount").select("VendorID",col("1").cast("bigint"),col("2").cast("bigint"),col("3").cast("bigint"),col("4").cast("bigint"),col("5").cast("bigint"),col("6").cast("bigint")).show()

+--------+--------+-------+------+------+------+------+
|VendorID|       1|      2|     3|     4|     5|     6|
+--------+--------+-------+------+------+------+------+
|       1| 5482565|1265902|160975| 49064|  6209|  4495|
|       2|11213768|2122529|626923|263774|546248|431529|
+--------+--------+-------+------+------+------+------+

I will keep updating this list with more PySpark Code Snippets.