How to implement recursive queries in Spark?

Recently I was working on a project in which client data warehouse was in Teradata. The requirement was to have something similar on Hadoop also for a specific business application. At a high level, the requirement was to have same data and run similar sql on that data to produce exactly same report on hadoop too. I don’t see any challenge in migrating data from Teradata to Hadoop. Also transforming SQL into equivalent HIVE/SPARK is not that difficult now. The only challenge I see was in converting Teradata recursive queries into spark since Spark does not support Recursive queries.

I searched for various options online ,even explored Spark GraphX API however I could not find suitable solution. Also I was wondering if somehow I can come up with more SQL like solution for recursive queries then it will be easy to implement and modify to incorporate more complex scenarios. I have tried something on spark-shell using scala loop to replicate similar recursive functionality in Spark. To understand the solution, let us see how recursive query works in Teradata.

In a recursive query, there is a seed statement which is the first query and generates a result set. The seed statement executes only once. In the second step, what ever resultset is generated by seed statement is JOINED with some other or same table to generate another resultset. The second step continues until we get some rows after JOIN. Once no new row is retrieved , iteration ends. All the data generated is present in a Recursive table which is available to user for querying purpose. So I have replicated same step using DataFrames and Temporary tables in Spark.

We will go through 2 examples of Teradata recursive query and will see equivalent Spark code for it. The first example is from Teradata site : Reference: Teradata Recursive Query
To create this dataset locally you can use below commands:

CREATE TABLE employee_rec (employee_number INT ,manager_employee_number INT);

insert into employee_rec values( 801,NULL);
insert into employee_rec values( 1003,801);
insert into employee_rec values( 1019,801);
insert into employee_rec values( 1016,801);
insert into employee_rec values( 1008,1019);
insert into employee_rec values( 1006,1019);
insert into employee_rec values( 1014,1019);
insert into employee_rec values( 1011,1019);
insert into employee_rec values( 1010,1003);
insert into employee_rec values( 1001,1003);
insert into employee_rec values( 1004,1003);
insert into employee_rec values( 1012,1004);
insert into employee_rec values( 1002,1004);
insert into employee_rec values( 1015,1004);

Teradata Recursive Query: Example -1

WITH RECURSIVE temp_table (employee_number) AS
(
SELECT root.employee_number
FROM employee_rec root
WHERE root.manager_employee_number = 801

UNION ALL

SELECT indirect.employee_number
FROM temp_table direct, employee_rec indirect
WHERE direct.employee_number = indirect.manager_employee_number
)
SELECT * FROM temp_table ORDER BY employee_number;

Output of above Recursive Query:

+---------------+
|employee_number|
+---------------+
|           1001|
|           1002|
|           1003|
|           1004|
|           1006|
|           1008|
|           1010|
|           1011|
|           1012|
|           1014|
|           1015|
|           1016|
|           1019|
+---------------+

In the above query, the part before “UNION ALL” is known as “seed” statement. The one after it is “Iterator” statement. “temp_table” is final output recursive table.

Spark equivalent : I am using Spark2. Open Spark-shell instance.

Step 0 : Create Spark Dataframe

val df = spark.createDataFrame(Seq(
(801,None),
(1003,Some(801)),
(1019,Some(801)),
(1016,Some(801)),
(1008,Some(1019)),
(1006,Some(1019)),
(1014,Some(1019)),
(1011,Some(1019)),
(1010,Some(1003)),
(1001,Some(1003)),
(1004,Some(1003)),
(1012,Some(1004)),
(1002,Some(1004)),
(1015,Some(1004))
)).toDF("employee_number", "manager_employee_number")

df.createOrReplaceTempView("employee_rec");

Step 1: Declare 2 variables.First one to hold value of number of rows in new dataset & second one to be used as counter.

var df_cnt:Int = 1
var cnt: Int = 1

Step 2: Create a dataframe which will hold output of seed statement.

val df_employee_seed = df.filter("manager_employee_number = 801")

Step 3: Register the dataframe as temp table to be used in next step for iteration.

df_employee_seed.createOrReplaceTempView("vt_seed0");

Step 4: Run the while loop to replicate iteration step

Use while loop to generate new dataframe for each run. We have generated new dataframe with sequence. At each step, previous dataframe is used to retrieve new resultset. If the dataframe does not have any rows then the loop is terminated. Same query from “iteration” statement is used here too. Also only register a temp table if dataframe has rows in it. Hence the “IF” condition is present in WHILE loop.

while (df_cnt != 0) {
     var tblnm = "vt_seed".concat((cnt-1).toString) ;
     var tblnm1 = "vt_seed".concat((cnt).toString) ;
     val df_employee_rec = spark.sql(s"select indirect.employee_number FROM $tblnm direct, employee_rec indirect WHERE direct.employee_number = indirect.manager_employee_number");
     df_cnt=df_employee_rec.count.toInt;
     if(df_cnt!=0){
     	df_employee_rec.createOrReplaceTempView(s"$tblnm1");
     }
     cnt = cnt + 1;
     }

Step 5: Merge multiple dataset into one and run final query

var fin_query = "";

for (a <- 0 to (cnt - 2)) {
  if (a == 0) {
    fin_query = fin_query
      .concat("select employee_number from vt_seed")
      .concat(a.toString());
  } else {
    fin_query = fin_query
      .concat(" union select employee_number from vt_seed")
      .concat(a.toString());
  }
}

/*
println(fin_query);
select employee_number from vt_seed0 union select employee_number from vt_seed1 union select employee_number from vt_seed2
*/

val final_result = spark.sql(s"$fin_query");
final_result.sort($"employee_number".asc).show();

+---------------+
|employee_number|
+---------------+
|           1001|
|           1002|
|           1003|
|           1004|
|           1006|
|           1008|
|           1010|
|           1011|
|           1012|
|           1014|
|           1015|
|           1016|
|           1019|
+---------------+

If you see this is same result as we have in Teradata.

Complete Final Code – Scala

//Step 0: Create dataframe in Scala
val df = spark.createDataFrame(Seq(
(801,None),
(1003,Some(801)),
(1019,Some(801)),
(1016,Some(801)),
(1008,Some(1019)),
(1006,Some(1019)),
(1014,Some(1019)),
(1011,Some(1019)),
(1010,Some(1003)),
(1001,Some(1003)),
(1004,Some(1003)),
(1012,Some(1004)),
(1002,Some(1004)),
(1015,Some(1004))
)).toDF("employee_number", "manager_employee_number")

df.show()
df.createOrReplaceTempView("employee_rec");

// Initialize counters
var df_cnt:Int = 1
var cnt: Int = 1

// fetch seed row
val df_employee_seed = df.filter("manager_employee_number = 801")
df_employee_seed.createOrReplaceTempView("vt_seed0");

// run while loop for iteration till we continue to find next row
while (df_cnt != 0) {
     var tblnm = "vt_seed".concat((cnt-1).toString) ;
     var tblnm1 = "vt_seed".concat((cnt).toString) ;
     val df_employee_rec = spark.sql(s"select indirect.employee_number FROM $tblnm direct, employee_rec indirect WHERE direct.employee_number = indirect.manager_employee_number");
     df_cnt=df_employee_rec.count.toInt;
     if(df_cnt!=0){
     	df_employee_rec.createOrReplaceTempView(s"$tblnm1");
     }
     cnt = cnt + 1;
     }

var fin_query = "";

// create final query by merging all datasets
for (a <- 0 to (cnt - 2)) {
  if (a == 0) {
    fin_query = fin_query
      .concat("select employee_number from vt_seed")
      .concat(a.toString());
  } else {
    fin_query = fin_query
      .concat(" union select employee_number from vt_seed")
      .concat(a.toString());
  }
}

println(fin_query);

//run final query
val final_result = spark.sql(s"$fin_query");
final_result.sort($"employee_number".asc).show();

I have tried another example of Teradata recursive query. Reference: etl-sql.com

CREATE TABLE vt_source
(
dept_no INT,
emp_name VARCHAR(100)
);

INSERT INTO vt_source values( 10,'A');
INSERT INTO vt_source values( 10,'B');
INSERT INTO vt_source values( 10,'C');
INSERT INTO vt_source values( 10,'D');
INSERT INTO vt_source values( 20,'P');
INSERT INTO vt_source values( 20,'Q');
INSERT INTO vt_source values( 20,'R');
INSERT INTO vt_source values( 20,'S');

CREATE TABLE vt_source_1 AS
SELECT dept_no,emp_name,
ROW_NUMBER() OVER(ORDER BY dept_no ASC,emp_name ASC) AS row_id
FROM vt_source;

Teradata Recursive Query – Example 2

WITH RECURSIVE str_concat(dept_no,emp_name ,row_id)
AS
(
SEL dept_no,emp_name ,row_id
FROM vt_source_1 tb1
WHERE tb1.row_id=1
UNION ALL
SEL tb1.dept_no,tb2.emp_name||','||tb1.emp_name ,tb1.row_id
FROM vt_source_1 tb1,str_concat tb2
WHERE tb2.row_id+1=tb1.row_id
)
SEL dept_no, emp_name AS emp_list FROM
(SEL dept_no,emp_name,row_id FROM str_concat)tb3
ORDER BY 1,2;

+-------+---------------+
|dept_no|       emp_name|
+-------+---------------+
|     10|              A|
|     10|            A,B|
|     10|          A,B,C|
|     10|        A,B,C,D|
|     20|      A,B,C,D,P|
|     20|    A,B,C,D,P,Q|
|     20|  A,B,C,D,P,Q,R|
|     20|A,B,C,D,P,Q,R,S|
+-------+---------------+

In Spark, we will follow same steps for this recursive query too. We will run seed statement once and will put iterative query in while loop. Keeping all steps together we will have following code on spark:

Complete Final Code – Example 2

//Step 0: Create dataframe in Scala
val df = spark.createDataFrame(Seq(
(10,"A",1),
(10,"B",2),
(10,"C",3),
(10,"D",4),
(20,"P",5),
(20,"Q",6),
(20,"R",7),
(20,"S",8)
)).toDF("dept_no", "emp_name","row_id")
df.createOrReplaceTempView("vt_source_1");

// Initialize counters
var df_cnt:Int = 1
var cnt: Int = 1

// fetch seed row
val df_employee_seed = df.filter("row_id=1")
df_employee_seed.createOrReplaceTempView("vt_seed0");

// run while loop for iteration till we continue to find next row
while (df_cnt != 0) {
     var tblnm = "vt_seed".concat((cnt-1).toString) ;
     var tblnm1 = "vt_seed".concat((cnt).toString) ;
     val df_employee_rec = spark.sql(s"SELECT tb1.dept_no,concat_ws(',',tb2.emp_name,tb1.emp_name) as emp_name ,tb1.row_id FROM vt_source_1 tb1,$tblnm tb2 WHERE tb2.row_id+1=tb1.row_id");
     df_cnt=df_employee_rec.count.toInt;
     if(df_cnt!=0){
      df_employee_rec.createOrReplaceTempView(s"$tblnm1");
     }
     cnt = cnt + 1;
     }

var fin_query = "";

// create final query by merging all datasets
for (a <- 0 to (cnt - 2)) {
  if (a == 0) {
    fin_query = fin_query
      .concat("select dept_no, emp_name from vt_seed")
      .concat(a.toString());
  } else {
    fin_query = fin_query
      .concat(" union select dept_no, emp_name from vt_seed")
      .concat(a.toString());
  }
}

//run final query
val final_result = spark.sql(s"$fin_query");
final_result.sort($"dept_no".asc,$"emp_name".asc).show();

+-------+---------------+
|dept_no|       emp_name|
+-------+---------------+
|     10|              A|
|     10|            A,B|
|     10|          A,B,C|
|     10|        A,B,C,D|
|     20|      A,B,C,D,P|
|     20|    A,B,C,D,P,Q|
|     20|  A,B,C,D,P,Q,R|
|     20|A,B,C,D,P,Q,R,S|
+-------+---------------+

In this way, I was able to convert simple recursive queries into equivalent Spark code. Hope this helps you too. I know it is not the efficient solution. However I cannot think of any other way of achieving it. If you have a better way of implementing same thing in Spark, feel free to leave a comment. I will be more than happy to test your method. Also if you have any question regarding the process I have explained here, leave a comment and I will try to answer your queries.

9 thoughts on “How to implement recursive queries in Spark?

  1. Fantastic, thank you. Just got mine to work and I am very grateful you posted this solution. Thanks so much.

Leave a Reply to James KoehlingCancel reply