Recently many people reached out to me requesting if I can assist them in learning PySpark , I thought of coming up with a utility which can convert SQL to PySpark code. I am sharing my weekend project with you guys where I have given a try to convert input SQL into PySpark dataframe code – sql to pyspark converter.
UPDATE 2 – 27 May 2022
I have removed the online utility now. I felt so glad to see that my weekend project actually was liked and used by so many people. Thank you all for using the utility.
I would like to share few numbers:
Total Queries Converted: 43091
Total Queries Failed: 30500
I have shared the code I have used below for you to further enhance and use it as per your requirement.
sql to pyspark converter :: Important points:
It is almost impossible to cover all types SQL and this utility is no exception. Considering this is my weekend project and I am still working on it, the SQL coverage may not be as much you or I would have loved to cover. That being said, I would like to share some points with you which you can consider while using the utility.
- The utility does not support JOIN & SUBQUERIES right now.
- While using aggregate functions make sure to use group by too
- Try to use alias for derived columns.
- Look at the sample query and you can use similar SQL to convert to PySpark.
- I have tried to make sure that the output generated is accurate however I will recommend you to verify the results at your end too.
- Version 1 : 24-May-2021
If you are interested in learning how to convert SQL into PySpark I will strongly recommend to read following posts:
Update – Code I have used to create this utility
I have received so many comments from blog readers that they want to contribute to this utility. Also many people have asked for the code.
Below is the code I have used to create this utility.
from moz_sql_parser import parse from moz_sql_parser import format import json query = """ SELECT product_id, Count(star_rating) as total_rating, Max(star_rating) AS best_rating, Min(star_rating) AS worst_rating FROM tbl_books WHERE verified_purchase = 'Y' AND review_date BETWEEN '1995-07-22' AND '2015-08-31' AND marketplace IN ( 'DE', 'US', 'UK', 'FR', 'JP' ) GROUP BY product_id ORDER BY total_rating asc,product_id desc,best_rating LIMIT 10; """ v_parse = parse(query) v_json = json.loads(json.dumps(v_parse,indent=4)) def fn_from(value): result_from="" if type(value) is str: result_from = format({ "from": value }) result_from = result_from[5:] elif type(value) is dict: if "name" in value.keys(): result_from = result_from + value['value']+".alias(\""+value['name']+"\")" else: result_from = result_from + value['value']+"" elif type(value) is list: for item_from in value: if type(item_from) is dict: if "name" in item_from.keys(): result_from = result_from + item_from['value']+".alias(\""+item_from['name']+"\")," else: result_from = result_from + item_from['value']+"," elif type(item_from) is str: result_from = result_from + item_from+"," return result_from def fn_select(value): result_select="" if type(value) is str: result_select = result_select + "\""+value+"\"," elif type(value) is dict: if "name" in value.keys(): result_select = result_select + "\""+value['value']+"\".alias(\""+value['name']+"\")" else: result_select = result_select + "\""+value['value']+"\"" elif type(value) is list: for item_select in value: if type(item_select) is dict: if type(item_select['value']) is dict: if "name" in item_select.keys(): result_select = result_select + "\""+item_select['name']+"\"," else: result_select = result_select + "\""+item_select['value']+"\".alias(\""+item_select['name']+"\")," else: result_select = result_select + "\""+item_select['value']+"\"," return result_select[:-1] def fn_where(value): result_where="" result_where = format({ "where": value })[6:] return result_where def fn_groupby(value): result_groupby="" result_groupby = format({ "groupby": value })[9:] return result_groupby def fn_agg(query): v_parse = parse(query) v_agg = "" for i in v_parse["select"]: if type(i["value"]) is dict: for key,value in i["value"].items(): v_agg = v_agg + (key+"("+"col(\""+str(value)+"\")"+").alias('"+i["name"]+"')") +"," v_agg = v_agg.replace("\n", "") return v_agg[:-1] def fn_orderby(query): v_parse = parse(query) v_orderby_collist="" v_orderby = v_parse["orderby"] for i in v_orderby: if i.get("sort", "asc") == "desc": v_sortorder = "desc()" else: v_sortorder = "asc()" v_orderby_collist = v_orderby_collist + "col(\""+str(i.get("value", ""))+"\")." +v_sortorder+"," return v_orderby_collist[:-1] def fn_limit(query): v_parse = parse(query) v_limit = v_parse["limit"] return v_limit def fn_genSQL(data): v_fn_from = v_fn_where = v_fn_groupby = v_fn_agg = v_fn_select = v_fn_orderby = v_fn_limit = "" for key,value in data.items(): # handle from if str(key)=="from": v_fn_from = fn_from(value) #handle where if str(key) =="where": v_fn_where = fn_where(value) #handle groupby if str(key) =="groupby": v_fn_groupby = fn_groupby(value) #handle agg if str(key) =="groupby": v_fn_agg = fn_agg(query) #handle select if str(key) =="select": v_fn_select = fn_select(value) #handle sort if str(key) =="orderby": v_fn_orderby = fn_orderby(query) #handle limit if str(key) =="limit": v_fn_limit = fn_limit(query) v_final_stmt = "" if v_fn_from: v_final_stmt = v_final_stmt + v_fn_from if v_fn_where: v_final_stmt = v_final_stmt + "\n.filter(\""+v_fn_where+"\")" if v_fn_groupby: v_final_stmt = v_final_stmt + "\n.groupBy(\""+v_fn_groupby+"\")" if v_fn_agg: v_final_stmt = v_final_stmt + "\n.agg("+v_fn_agg+"\")" if v_fn_select: v_final_stmt = v_final_stmt + "\n.select("+v_fn_select+")" if v_fn_orderby: v_final_stmt = v_final_stmt + "\n.orderBy("+v_fn_orderby+")" if v_fn_limit: v_final_stmt = v_final_stmt + "\n.limit("+str(v_fn_limit)+")" return v_final_stmt print (fn_genSQL(v_json))
Let me know if you have any comments.
Might be interesting to add a PySpark dialect to SQLglot
https://github.com/tobymao/sqlglot
https://github.com/tobymao/sqlglot/tree/main/sqlglot/dialects
Thank you for sharing this.
I will give it a try as well.
It will be great if you can have a link to the convertor. It helps the community for anyone starting on working on pyspark.
Cool tool 🙂 like it very much!
Maybe, and if you have some spare minutes over your weekends 🙂
could you add the below listagg() function to your tool.
It’s a concatenated list with a “:” as delimiter.
That would be awesome. Many thanks in advance.
Cheers
SELECT l.ab_id,
listagg(DISTINCT l.lf||’:’||l.tol||’:’||l.l, ‘, ‘
ON OVERFLOW TRUNCATE) AS l_lst
FROM t_l l
WHERE l.stat = ‘READY’
GROUP BY l.ab_id
Could you please post a link to a source code in github? Thank you!
for any other queries it throwing error “undefined” . how to define the query/ table/fields
Thanks for the good work! Would you consider make it open source so other developer could fork it?
in case this project is open for self service contribution , please do share the details
Hi Sir, I am Data Engineer and I am new to pyspark and have assigned task so would you be so kind to help me regarding that?
Hey I would like to add join queries I mean work together and make it little more over-powered