Teradata to PySpark – Replicate ACTIVITYCOUNT to Spark

Recently I was working on a project to convert Teradata BTEQ to PySpark code. Since it was mostly SQL queries, we were asked to typically transform into Spark SQL and run it using PySpark. We used sqlContext mostly for SQL queries however in Teradata you can have some constructs like ACITIVTYCOUNT which can help in deciding if you want to run subsequent queries or not. These conditional constructs cannot be directly converted to equivalent Spark SQL. So in pyspark , we converted a dataframe to array and then check array index value to implement IF condition. Be very careful while converting dataframe to array. It must be very small in size else you will get memory related error.
Let’s see an example now:

Teradata : The below code will check if there is data loaded into staging table or not. If no data is loaded then don’t run target table load.

DELETE FROM stg_usa_prez;

INSERT into stg_usa_prez select * from raw_usa_prez;

.if activitycount = 0 then .GOTO SkipTgtLoad;

INSERT into usa_prez select * from stg_usa_prez
.LABEL SkipTgtLoad

PySpark : The below code will convert dataframe to array using collect() as output is only 1 row 1 column. We will check for the value and will decide using IF condition whether we have to run subsequent queries or not.

sqlContext.sql("INSERT OVERWRITE TABLE stg_usa_prez select * from raw_usa_prez")
v_cnt = sqlContext.sql("select count(*) as tot_cnt from stg_usa_prez").collect()
if v_cnt[0].tot_cnt > 0:
    sqlContext.sql("INSERT into usa_prez select * from stg_usa_prez")

Hope this helps. Let me know if you have tried any other method to replicate same functionality.

Leave a Reply