In the previous post, we saw many common conversions from SQL to Dataframe in PySpark. In this post, we will see the strategy which you can follow to convert typical SQL query to dataframe in PySpark. If you have not checked previous post, I will strongly recommend to do it as we will refer to some code snippets from that post.
Input Data and Spark SQL
We will be using amazon open dataset for this post as example to explain how can you convert SQL query into Spark Dataframe.
df_books = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet") df_books.createOrReplaceTempView("tbl_books") spark.sql("select product_id,count(star_rating) as total_rating,max(star_rating) as best_rating, min(star_rating) as worst_rating from tbl_books t1 where verified_purchase='Y' and review_date between '1995-07-22' and '2015-08-31' and marketplace in ('DE','US','UK','FR','JP') group by product_id order by total_rating desc limit 10").show(truncate=False) +----------+------------+-----------+------------+ |product_id|total_rating|best_rating|worst_rating| +----------+------------+-----------+------------+ |0345803485|1039 |5 |1 | |034580404X|940 |5 |1 | |1451695195|696 |5 |1 | |0345803493|538 |5 |1 | |0345803507|516 |5 |1 | |0345529057|510 |5 |1 | |1616550414|481 |5 |1 | |030758836X|477 |5 |1 | |1591451884|477 |5 |1 | |0525478817|433 |5 |2 | +----------+------------+-----------+------------+
Understand the SQL execution order
This is the first and most important step. You can apply multiple transformations on dataframe however a lot depend on the order in which you are applying the transformations. The simple method is to follow SQL execution order and convert SQL steps into that order only into dataframe code.
SQL Execution Order
Sequence | Step | Description |
---|---|---|
1 | FROM | Scan the base tables used in query |
2 | WHERE | Filter the records from base tables |
3 | GROUP BY | Aggregate Calculation on base data |
4 | HAVING | Filter the records post aggregation |
5 | SELECT | Fetch required columns for user |
6 | ORDER BY | Sort the data in result set |
7 | LIMIT | Restrict number of rows in output |
Analyse the Input SQL Query
To understand this , we will use below sample QUERY and will break it into different steps and order it as per the table mentioned above.
SELECT product_id, Count(star_rating) AS total_rating, Max(star_rating) AS best_rating, Min(star_rating) AS worst_rating FROM tbl_books WHERE verified_purchase = 'Y' AND review_date BETWEEN '1995-07-22' AND '2015-08-31' AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' ) GROUP BY product_id ORDER BY total_rating DESC LIMIT 10;
In the above query we can clearly see different steps are used i.e. SELECT , FROM , WHERE , GROUP BY , ORDER BY & LIMIT.
Convert SQL Steps into equivalent Dataframe code
FROM
In this case , we have only one base table and that is “tbl_books“. The table equivalent is Dataframe in PySpark. So we will have a dataframe equivalent to this table in our code. Let’s call it “df_books“
WHERE
Let’s identify the WHERE or FILTER condition in the given SQL Query.
WHERE verified_purchase = 'Y' AND review_date BETWEEN '1995-07-22' AND '2015-08-31' AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' )
We can convert this into Dataframe code in multiple ways however the easiest method is copy the conditions and put it inside a FILTER function in PySpark. So our filter condition will look like
filter("verified_purchase='Y' and review_date between '1995-07-22' and '2015-08-31' and marketplace in ('DE','US','UK','FR','JP')")
GROUP BY
The given query has GROUP BY on “product_id” column. Also it does aggregation on star_rating and calculates COUNT, MAX & MIN. So GroupBy in Dataframe along with Aggregate can be done using below command.
--Aggregate Count(star_rating) AS total_rating, Max(star_rating) AS best_rating, Min(star_rating) AS worst_rating --GroupBy GROUP BY product_id
.groupBy("product_id") .agg(count(col("star_rating")),max(col("star_rating")),min(col("star_rating")))
The query does not have “HAVING” step so we can skip it.
SELECT
Select stage fetches all the required columns for the output. It also does column level transformation. In this case it is creating ALIAS for the aggregate columns. So we can create alias first using “withColumnRenamed” and then select the output columns.
.withColumnRenamed('count(star_rating)', 'total_rating') .withColumnRenamed('max(star_rating)', 'best_rating') .withColumnRenamed('min(star_rating)', 'worst_rating') .select("product_id","total_rating","best_rating","worst_rating")
We could have used “alias” also in the agg function itself. Then the pyspark dataframe code may look like below
.agg(count(col("star_rating")).alias('total_rating'),max(col("star_rating")).alias('best_rating'),min(col("star_rating")).alias('worst_rating')) .select("product_id","total_rating","best_rating","worst_rating")
You can use any of the approach here i.e. using withColumnRenamed or using alias which ever you find comfortable to use.
ORDER BY
This step sorts the output data as per condition mentioned in the input query. We can use “SORT” or “ORDERBY” to convert query into Dataframe code. We will use ORDERBY as it corresponds to SQL Order By.
.orderBy(col('total_rating'),ascending = False)\
LIMIT or TOP or SAMPLE
The last step is to restrict number of rows to display to user. This step limits the number of records in the final output. We can use “LIMIT” to convert it into Dataframe code.
.limit(10)
This completes the execution order steps. We have covered all the steps above. Let’s put everything together and see the converted query and output.
Final Query
from pyspark.sql.functions import * df_books.filter("verified_purchase='Y' and review_date between '1995-07-22' and '2015-08-31' and marketplace in ('DE','US','UK','FR','JP')")\ .groupBy("product_id")\ .agg(count(col("star_rating")),max(col("star_rating")),min(col("star_rating")))\ .withColumnRenamed('count(star_rating)', 'total_rating')\ .withColumnRenamed('max(star_rating)', 'best_rating')\ .withColumnRenamed('min(star_rating)', 'worst_rating')\ .select("product_id","total_rating","best_rating","worst_rating")\ .orderBy(col('total_rating'),ascending = False)\ .limit(10)\ .show(truncate=False) +----------+------------+-----------+------------+ |product_id|total_rating|best_rating|worst_rating| +----------+------------+-----------+------------+ |0345803485|1039 |5 |1 | |034580404X|940 |5 |1 | |1451695195|696 |5 |1 | |0345803493|538 |5 |1 | |0345803507|516 |5 |1 | |0345529057|510 |5 |1 | |1616550414|481 |5 |1 | |030758836X|477 |5 |1 | |1591451884|477 |5 |1 | |0525478817|433 |5 |2 | +----------+------------+-----------+------------+
I hope using the approach mentioned in the post above shall help you in converting SQL queries into Dataframe code in more systematic manner. If you prefer watching video , check the video below:
Let me know any feedback in comments section below.