PySpark

How to convert SQL Queries into PySpark

In the previous post, we saw many common conversions from SQL to Dataframe in PySpark. In this post, we will see the strategy which you can follow to convert typical SQL query to dataframe in PySpark. If you have not checked previous post, I will strongly recommend to do it as we will refer to some code snippets from that post.

Input Data and Spark SQL

We will be using amazon open dataset for this post as example to explain how can you convert SQL query into Spark Dataframe.

df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet")

df_books.createOrReplaceTempView("tbl_books")

spark.sql("select product_id,count(star_rating) as total_rating,max(star_rating) as best_rating, min(star_rating) as worst_rating from tbl_books t1 where verified_purchase='Y' and review_date between '1995-07-22' and '2015-08-31' and marketplace in ('DE','US','UK','FR','JP') group by product_id order by total_rating desc limit 10").show(truncate=False)
+----------+------------+-----------+------------+
|product_id|total_rating|best_rating|worst_rating|
+----------+------------+-----------+------------+
|0345803485|1039        |5          |1           |
|034580404X|940         |5          |1           |
|1451695195|696         |5          |1           |
|0345803493|538         |5          |1           |
|0345803507|516         |5          |1           |
|0345529057|510         |5          |1           |
|1616550414|481         |5          |1           |
|030758836X|477         |5          |1           |
|1591451884|477         |5          |1           |
|0525478817|433         |5          |2           |
+----------+------------+-----------+------------+

Understand the SQL execution order

This is the first and most important step. You can apply multiple transformations on dataframe however a lot depend on the order in which you are applying the transformations. The simple method is to follow SQL execution order and convert SQL steps into that order only into dataframe code.

SQL Execution Order

SequenceStepDescription
1FROMScan the base tables used in query
2WHEREFilter the records from base tables
3GROUP BYAggregate Calculation on base data
4HAVINGFilter the records post aggregation
5SELECTFetch required columns for user
6ORDER BYSort the data in result set
7LIMITRestrict number of rows in output
SQL EXECUTION ORDER

Analyse the Input SQL Query

To understand this , we will use below sample QUERY and will break it into different steps and order it as per the table mentioned above.

SELECT product_id,
       Count(star_rating) AS total_rating,
       Max(star_rating)   AS best_rating,
       Min(star_rating)   AS worst_rating
FROM   tbl_books
WHERE  verified_purchase = 'Y'
       AND review_date BETWEEN '1995-07-22' AND '2015-08-31'
       AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' )
GROUP  BY product_id
ORDER  BY total_rating DESC
LIMIT  10;

In the above query we can clearly see different steps are used i.e. SELECT , FROM , WHERE , GROUP BY , ORDER BY & LIMIT.

Convert SQL Steps into equivalent Dataframe code

FROM

In this case , we have only one base table and that is “tbl_books“. The table equivalent is Dataframe in PySpark. So we will have a dataframe equivalent to this table in our code. Let’s call it “df_books

WHERE

Let’s identify the WHERE or FILTER condition in the given SQL Query.

WHERE  verified_purchase = 'Y'
       AND review_date BETWEEN '1995-07-22' AND '2015-08-31'
       AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' )

We can convert this into Dataframe code in multiple ways however the easiest method is copy the conditions and put it inside a FILTER function in PySpark. So our filter condition will look like

filter("verified_purchase='Y' and review_date between '1995-07-22' and '2015-08-31' and marketplace in ('DE','US','UK','FR','JP')")

GROUP BY

The given query has GROUP BY on “product_id” column. Also it does aggregation on star_rating and calculates COUNT, MAX & MIN. So GroupBy in Dataframe along with Aggregate can be done using below command.

--Aggregate
Count(star_rating) AS total_rating,
Max(star_rating)   AS best_rating,
Min(star_rating)   AS worst_rating
--GroupBy
GROUP BY product_id
.groupBy("product_id")
.agg(count(col("star_rating")),max(col("star_rating")),min(col("star_rating")))

The query does not have “HAVING” step so we can skip it.

SELECT

Select stage fetches all the required columns for the output. It also does column level transformation. In this case it is creating ALIAS for the aggregate columns. So we can create alias first using “withColumnRenamed” and then select the output columns.

.withColumnRenamed('count(star_rating)', 'total_rating')
.withColumnRenamed('max(star_rating)', 'best_rating')
.withColumnRenamed('min(star_rating)', 'worst_rating')
.select("product_id","total_rating","best_rating","worst_rating")

We could have used “alias” also in the agg function itself. Then the pyspark dataframe code may look like below

.agg(count(col("star_rating")).alias('total_rating'),max(col("star_rating")).alias('best_rating'),min(col("star_rating")).alias('worst_rating'))
.select("product_id","total_rating","best_rating","worst_rating")

You can use any of the approach here i.e. using withColumnRenamed or using alias which ever you find comfortable to use.

ORDER BY

This step sorts the output data as per condition mentioned in the input query. We can use “SORT” or “ORDERBY” to convert query into Dataframe code. We will use ORDERBY as it corresponds to SQL Order By.

.orderBy(col('total_rating'),ascending = False)\

LIMIT or TOP or SAMPLE

The last step is to restrict number of rows to display to user. This step limits the number of records in the final output. We can use “LIMIT” to convert it into Dataframe code.

.limit(10)

This completes the execution order steps. We have covered all the steps above. Let’s put everything together and see the converted query and output.

Final Query

from pyspark.sql.functions import *

df_books.filter("verified_purchase='Y' and review_date between '1995-07-22' and '2015-08-31' and marketplace in ('DE','US','UK','FR','JP')")\
    .groupBy("product_id")\
    .agg(count(col("star_rating")),max(col("star_rating")),min(col("star_rating")))\
    .withColumnRenamed('count(star_rating)', 'total_rating')\
    .withColumnRenamed('max(star_rating)', 'best_rating')\
    .withColumnRenamed('min(star_rating)', 'worst_rating')\
    .select("product_id","total_rating","best_rating","worst_rating")\
    .orderBy(col('total_rating'),ascending = False)\
    .limit(10)\
    .show(truncate=False)

+----------+------------+-----------+------------+
|product_id|total_rating|best_rating|worst_rating|
+----------+------------+-----------+------------+
|0345803485|1039        |5          |1           |
|034580404X|940         |5          |1           |
|1451695195|696         |5          |1           |
|0345803493|538         |5          |1           |
|0345803507|516         |5          |1           |
|0345529057|510         |5          |1           |
|1616550414|481         |5          |1           |
|030758836X|477         |5          |1           |
|1591451884|477         |5          |1           |
|0525478817|433         |5          |2           |
+----------+------------+-----------+------------+

I hope using the approach mentioned in the post above shall help you in converting SQL queries into Dataframe code in more systematic manner. If you prefer watching video , check the video below:

Let me know any feedback in comments section below.

Leave a Reply