Recently many people reached out to me requesting if I can assist them in learning PySpark , I thought of coming up with a utility which can convert SQL to PySpark code. I am sharing my weekend project with you guys where I have given a try to convert input SQL into PySpark dataframe code – sql to pyspark converter.
UPDATE 2 – 27 May 2022
I have removed the online utility now. I felt so glad to see that my weekend project actually was liked and used by so many people. Thank you all for using the utility.
I would like to share few numbers:
Total Queries Converted: 43091
Total Queries Failed: 30500
I have shared the code I have used below for you to further enhance and use it as per your requirement.

sql to pyspark converter :: Important points:
It is almost impossible to cover all types SQL and this utility is no exception. Considering this is my weekend project and I am still working on it, the SQL coverage may not be as much you or I would have loved to cover. That being said, I would like to share some points with you which you can consider while using the utility.
- The utility does not support JOIN & SUBQUERIES right now.
- While using aggregate functions make sure to use group by too
- Try to use alias for derived columns.
- Look at the sample query and you can use similar SQL to convert to PySpark.
- I have tried to make sure that the output generated is accurate however I will recommend you to verify the results at your end too.
- Version 1 : 24-May-2021
If you are interested in learning how to convert SQL into PySpark I will strongly recommend to read following posts:
Update – Code I have used to create this utility
I have received so many comments from blog readers that they want to contribute to this utility. Also many people have asked for the code.
Below is the code I have used to create this utility.
from moz_sql_parser import parse
from moz_sql_parser import format
import json
query = """
SELECT product_id,
Count(star_rating) as total_rating,
Max(star_rating) AS best_rating,
Min(star_rating) AS worst_rating
FROM tbl_books
WHERE verified_purchase = 'Y'
AND review_date BETWEEN '1995-07-22' AND '2015-08-31'
AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' )
GROUP BY product_id
ORDER BY total_rating asc,product_id desc,best_rating
LIMIT 10;
"""
v_parse = parse(query)
v_json = json.loads(json.dumps(v_parse,indent=4))
def fn_from(value):
result_from=""
if type(value) is str:
result_from = format({ "from": value })
result_from = result_from[5:]
elif type(value) is dict:
if "name" in value.keys():
result_from = result_from + value['value']+".alias(\""+value['name']+"\")"
else:
result_from = result_from + value['value']+""
elif type(value) is list:
for item_from in value:
if type(item_from) is dict:
if "name" in item_from.keys():
result_from = result_from + item_from['value']+".alias(\""+item_from['name']+"\"),"
else:
result_from = result_from + item_from['value']+","
elif type(item_from) is str:
result_from = result_from + item_from+","
return result_from
def fn_select(value):
result_select=""
if type(value) is str:
result_select = result_select + "\""+value+"\","
elif type(value) is dict:
if "name" in value.keys():
result_select = result_select + "\""+value['value']+"\".alias(\""+value['name']+"\")"
else:
result_select = result_select + "\""+value['value']+"\""
elif type(value) is list:
for item_select in value:
if type(item_select) is dict:
if type(item_select['value']) is dict:
if "name" in item_select.keys():
result_select = result_select + "\""+item_select['name']+"\","
else:
result_select = result_select + "\""+item_select['value']+"\".alias(\""+item_select['name']+"\"),"
else:
result_select = result_select + "\""+item_select['value']+"\","
return result_select[:-1]
def fn_where(value):
result_where=""
result_where = format({ "where": value })[6:]
return result_where
def fn_groupby(value):
result_groupby=""
result_groupby = format({ "groupby": value })[9:]
return result_groupby
def fn_agg(query):
v_parse = parse(query)
v_agg = ""
for i in v_parse["select"]:
if type(i["value"]) is dict:
for key,value in i["value"].items():
v_agg = v_agg + (key+"("+"col(\""+str(value)+"\")"+").alias('"+i["name"]+"')") +","
v_agg = v_agg.replace("\n", "")
return v_agg[:-1]
def fn_orderby(query):
v_parse = parse(query)
v_orderby_collist=""
v_orderby = v_parse["orderby"]
for i in v_orderby:
if i.get("sort", "asc") == "desc":
v_sortorder = "desc()"
else:
v_sortorder = "asc()"
v_orderby_collist = v_orderby_collist + "col(\""+str(i.get("value", ""))+"\")." +v_sortorder+","
return v_orderby_collist[:-1]
def fn_limit(query):
v_parse = parse(query)
v_limit = v_parse["limit"]
return v_limit
def fn_genSQL(data):
v_fn_from = v_fn_where = v_fn_groupby = v_fn_agg = v_fn_select = v_fn_orderby = v_fn_limit = ""
for key,value in data.items():
# handle from
if str(key)=="from":
v_fn_from = fn_from(value)
#handle where
if str(key) =="where":
v_fn_where = fn_where(value)
#handle groupby
if str(key) =="groupby":
v_fn_groupby = fn_groupby(value)
#handle agg
if str(key) =="groupby":
v_fn_agg = fn_agg(query)
#handle select
if str(key) =="select":
v_fn_select = fn_select(value)
#handle sort
if str(key) =="orderby":
v_fn_orderby = fn_orderby(query)
#handle limit
if str(key) =="limit":
v_fn_limit = fn_limit(query)
v_final_stmt = ""
if v_fn_from:
v_final_stmt = v_final_stmt + v_fn_from
if v_fn_where:
v_final_stmt = v_final_stmt + "\n.filter(\""+v_fn_where+"\")"
if v_fn_groupby:
v_final_stmt = v_final_stmt + "\n.groupBy(\""+v_fn_groupby+"\")"
if v_fn_agg:
v_final_stmt = v_final_stmt + "\n.agg("+v_fn_agg+"\")"
if v_fn_select:
v_final_stmt = v_final_stmt + "\n.select("+v_fn_select+")"
if v_fn_orderby:
v_final_stmt = v_final_stmt + "\n.orderBy("+v_fn_orderby+")"
if v_fn_limit:
v_final_stmt = v_final_stmt + "\n.limit("+str(v_fn_limit)+")"
return v_final_stmt
print (fn_genSQL(v_json))
Let me know if you have any comments.
Might be interesting to add a PySpark dialect to SQLglot
https://github.com/tobymao/sqlglot
https://github.com/tobymao/sqlglot/tree/main/sqlglot/dialects
Thank you for sharing this.
I will give it a try as well.
It will be great if you can have a link to the convertor. It helps the community for anyone starting on working on pyspark.
Cool tool 🙂 like it very much!
Maybe, and if you have some spare minutes over your weekends 🙂
could you add the below listagg() function to your tool.
It’s a concatenated list with a “:” as delimiter.
That would be awesome. Many thanks in advance.
Cheers
SELECT l.ab_id,
listagg(DISTINCT l.lf||’:’||l.tol||’:’||l.l, ‘, ‘
ON OVERFLOW TRUNCATE) AS l_lst
FROM t_l l
WHERE l.stat = ‘READY’
GROUP BY l.ab_id
Could you please post a link to a source code in github? Thank you!
for any other queries it throwing error “undefined” . how to define the query/ table/fields
Thanks for the good work! Would you consider make it open source so other developer could fork it?
in case this project is open for self service contribution , please do share the details
Hi Sir, I am Data Engineer and I am new to pyspark and have assigned task so would you be so kind to help me regarding that?
Hey I would like to add join queries I mean work together and make it little more over-powered