How to implement recursive queries in Spark?

Recently I was working on a project in which client data warehouse was in Teradata. The requirement was to have something similar on Hadoop also for a specific business application. At a high level, the requirement was to have same data and run similar sql on that data to produce exactly same report on hadoop too. I don’t see any challenge in migrating data from Teradata to Hadoop. Also transforming SQL into equivalent HIVE/SPARK is not that difficult now. The only challenge I see was in converting Teradata recursive queries into spark since Spark does not support Recursive queries.

I searched for various options online ,even explored Spark GraphX API however I could not find suitable solution. Also I was wondering if somehow I can come up with more SQL like solution for recursive queries then it will be easy to implement and modify to incorporate more complex scenarios. I have tried something on spark-shell using scala loop to replicate similar recursive functionality in Spark. To understand the solution, let us see how recursive query works in Teradata.

In a recursive query, there is a seed statement which is the first query and generates a result set. The seed statement executes only once. In the second step, what ever resultset is generated by seed statement is JOINED with some other or same table to generate another resultset. The second step continues on recent dataset until we get some rows after JOIN. Once no new row is retrieved , iteration ends. All the data generated is present in a Recursive table which is available to user for querying purpose. So I have replicated same step using DataFrames and Temporary tables in Spark.

We will go through 2 examples of Teradata recursive query and will see equivalent Spark code for it. The first example is from Teradata site : Reference: Teradata Recursive Query
To create this dataset locally you can use below commands:

CREATE TABLE employee_rec (employee_number INT ,manager_employee_number INT);

insert into employee_rec values( 801,NULL);
insert into employee_rec values( 1003,801);
insert into employee_rec values( 1019,801);
insert into employee_rec values( 1016,801);
insert into employee_rec values( 1008,1019);
insert into employee_rec values( 1006,1019);
insert into employee_rec values( 1014,1019);
insert into employee_rec values( 1011,1019);
insert into employee_rec values( 1010,1003);
insert into employee_rec values( 1001,1003);
insert into employee_rec values( 1004,1003);
insert into employee_rec values( 1012,1004);
insert into employee_rec values( 1002,1004);
insert into employee_rec values( 1015,1004);

Teradata Recursive Query:

WITH RECURSIVE temp_table (employee_number) AS
(

SELECT root.employee_number FROM employee_rec root WHERE root.manager_employee_number = 801

UNION ALL

SELECT indirect.employee_number FROM temp_table direct, employee_rec indirect WHERE direct.employee_number = indirect.manager_employee_number

) SELECT * FROM temp_table ORDER BY employee_number;

In the above query, the part highlighted in RED is “seed” statement. The one in GREEN is “Iterator” statement. “temp_table” is output recursive table.

Spark equivalent : I am using Spark2. Open Spark-shell instance.

Step 1: Declare 2 variables.First one to hold value of number of rows in new dataset & second one to be used as counter.

scala> var df_cnt:Int = 1
df_cnt: Int = 1
scala> var cnt: Int = 1
cnt: Int = 1

Step 2: Create a dataframe which will hold output of seed statement.

scala> val df_employee_seed = spark.sql("select employee_number from db_recursive.employee_rec where manager_employee_number = 801");
df_employee_seed: org.apache.spark.sql.DataFrame = [employee_number: int]

Step 3: Register the dataframe as temp table to be used in next step for iteration.

scala> df_employee_seed.registerTempTable("vt_seed0");

Step 4: Run the while loop to replicate iteration step

Use while loop to generate new dataframe for each run. We have generated new dataframe with sequence. At each step, previous dataframe is used to retrieve new resultset. If the dataframe does not have any rows then the loop is terminated. Same query from “iteration” statement is used here too. Also only register a temp table if dataframe has rows in it. Hence the “IF” condition is present in WHILE loop.

scala> while (df_cnt != 0) {
     |  var tblnm = "vt_seed".concat((cnt-1).toString) ;
     |  var tblnm1 = "vt_seed".concat((cnt).toString) ;
     |  val df_employee_rec = spark.sql(s"select indirect.employee_number FROM $tblnm direct, db_recursive.employee_rec indirect WHERE direct.employee_number = indirect.manager_employee_number");
     |  df_cnt=df_employee_rec.count.toInt;
     |  if(df_cnt!=0){
     |  df_employee_rec.registerTempTable(s"$tblnm1");
     |  }
     |  cnt = cnt + 1;
     |  }

Step 5: Merge multiple dataset into one and run final query

scala> var fin_query = "";
fin_query: String = ""

scala>  for ( a <- 0 to (cnt - 2)){ if(a == 0 ){ fin_query = fin_query.concat("select employee_number from vt_seed").concat(a.toString()); println(fin_query); } else { fin_query = fin_query.concat(" union select employee_number from vt_seed").concat(a.toString()); } } select employee_number from vt_seed0 select employee_number from vt_seed0 union select employee_number from vt_seed1 select employee_number from vt_seed0 union select employee_number from vt_seed1 union select employee_number from vt_seed2 scala>  val final_result = spark.sql(s"$fin_query");
final_result: org.apache.spark.sql.DataFrame = [employee_number: int]

scala>  final_result.sort($"employee_number".asc).show();
+---------------+
|employee_number|
+---------------+
|           1001|
|           1002|
|           1003|
|           1004|
|           1006|
|           1008|
|           1010|
|           1011|
|           1012|
|           1014|
|           1015|
|           1016|
|           1019|
+---------------+

If you see this is same result as we have in Teradata. I have tried another example of Teradata recursive query. Reference: Teradata SQL Tutorial

CREATE TABLE vt_source
(
dept_no INT,
emp_name VARCHAR(100)
);

INSERT INTO vt_source values( 10,'A');
INSERT INTO vt_source values( 10,'B');
INSERT INTO vt_source values( 10,'C');
INSERT INTO vt_source values( 10,'D');
INSERT INTO vt_source values( 20,'P');
INSERT INTO vt_source values( 20,'Q');
INSERT INTO vt_source values( 20,'R');
INSERT INTO vt_source values( 20,'S');

CREATE TABLE vt_source_1 AS
SELECT dept_no,emp_name,
ROW_NUMBER() OVER(ORDER BY dept_no ASC,emp_name ASC) AS row_id
FROM vt_source;

Teradata Recursive Query

WITH RECURSIVE str_concat(dept_no,emp_name ,row_id)
AS
(
SEL dept_no,emp_name ,row_id
FROM vt_source_1 tb1
WHERE tb1.row_id=1
UNION ALL
SEL tb1.dept_no,tb2.emp_name||','||tb1.emp_name ,tb1.row_id
FROM vt_source_1 tb1,str_concat tb2
WHERE tb2.row_id+1=tb1.row_id
)
SEL dept_no, emp_name AS emp_list FROM
(SEL dept_no,emp_name,row_id FROM str_concat)tb3
ORDER BY 1,2

In Spark, we will follow same steps for this recursive query too. We will run seed statement once and will put iterative query in while loop. Keeping all steps together we will have following code on spark:

scala> var df_cnt:Int = 1
df_cnt: Int = 1

scala>  var cnt: Int = 1
cnt: Int = 1

scala>  val df_employee_seed = spark.sql("SELECT dept_no,emp_name ,row_id FROM db_recursive.vt_source_1 tb1 WHERE tb1.row_id=1");
df_employee_seed: org.apache.spark.sql.DataFrame = [dept_no: int, emp_name: string ... 1 more field]

scala>  df_employee_seed.registerTempTable("vt_seed0");
warning: there was one deprecation warning; re-run with -deprecation for details

scala>  while (df_cnt != 0) {
     |  var tblnm = "vt_seed".concat((cnt-1).toString) ;
     |  var tblnm1 = "vt_seed".concat((cnt).toString) ;
     |   val df_employee_rec = spark.sql(s"SELECT tb1.dept_no,concat_ws(',',tb2.emp_name,tb1.emp_name) as emp_name ,tb1.row_id FROM db_recursive.vt_source_1 tb1,$tblnm tb2 WHERE tb2.row_id+1=tb1.row_id");
     |  df_cnt=df_employee_rec.count.toInt;
     |  if(df_cnt!=0){
     |  df_employee_rec.registerTempTable(s"$tblnm1");
     |  }
     |  cnt = cnt + 1;
     |  }
scala> var fin_query = "";
fin_query: String = ""

scala>  for ( a <- 0 to (cnt - 2)){ if(a == 0 ){ fin_query = fin_query.concat("select dept_no, emp_name from vt_seed").concat(a.toString()); } else { fin_query = fin_query.concat(" union select dept_no, emp_name from vt_seed").concat(a.toString()); } } scala>  val final_result = spark.sql(s"$fin_query");
final_result: org.apache.spark.sql.DataFrame = [dept_no: int, emp_name: string]

scala>  final_result.sort($"dept_no".asc,$"emp_name".asc).show();
+-------+---------------+
|dept_no|       emp_name|
+-------+---------------+
|     10|              A|
|     10|            A,B|
|     10|          A,B,C|
|     10|        A,B,C,D|
|     20|      A,B,C,D,P|
|     20|    A,B,C,D,P,Q|
|     20|  A,B,C,D,P,Q,R|
|     20|A,B,C,D,P,Q,R,S|
+-------+---------------+

Note: You can use createOrReplaceTempView in place of registerTempTable to avoid deprecation warning.

In this way, I was able to convert simple recursive queries into equivalent Spark code. Hope this helps you too. I know it is not the efficient solution. However I cannot think of any other way of achieving it. If you have a better way of implementing same thing in Spark, feel free to leave a comment. I will be more than happy to test your method. Also if you have any question regarding the process I have explained here, leave a comment and I will try to answer your queries.

3 Replies to “How to implement recursive queries in Spark?”

Leave a Reply

Your email address will not be published. Required fields are marked *