Recently, I was working on one project where the ETL requirement was to have daily snapshot of the table. It was 15+ years old data model on which datawarehouse was designed and the client wanted to replicate it on Hadoop. So you can convert the ETL to Spark SQL however not everything works as-is on Spark. Also not everything as-is is best and efficient on Spark. Similarly another requirement was to create daily back-up of target table at the end of ETL chain. Even the SQL business logic used back-up table to refer to one day old data and current table for today’s data.
There was hard deadline by which client wishes to migrate multiple applications from Traditional RDBMS to Hadoop. He insisted on keeping same data model and same business logic on new platform too primarily for 2 reasons:
- Quick turnaround time and minimum time to market.
- Existing capability familiarity and comfort level
So to create back-up we were using “INSERT OVERWRITE TABLE tablename_bkup select * from tablename“. This obviously is straight forward and first thing which comes to mind if you have to take back-up of any table. However in Hadoop, we observed significant time was spent on creating table back-ups. For example, I was running a back-up for a table with 85 columns in it. Total rows were 32 Mn+ and size was 24.9 Gb (parquet snappy format) split across 375 file parts. When I run typical insert overwrite for this table it was taking around 40 mins to run. Remember, I was using shared queue with n numbers of other concurrent applications running on cluster.
I tried another approach to reduce this time to less than 20 seconds. Since in Hadoop everything is in files. So I tried moving files from one directory to another i.e. from Source table HDFS location to Target table HDFS location. It was way faster than INSERT-OVERWRITE still was taking good time around 8 mins. I wanted to try something better than this.
Next thing I tried was “DistCP” or distributed copy which invokes map-reduce jobs and copy data in parallel into target location across nodes/clusters. To my surprise, job completed in less than 20 seconds.
I am sharing below scala code to achieve it in spark-shell
import sys.process._ val sample_loc = spark.sql("desc formatted sample_07").toDF.filter($"col_name" like "Location%").select("data_type").collect()(0)(0).toString() val sample_bkp_loc=spark.sql("desc formatted sample_07_bkp").toDF.filter($"col_name" like "Location%").select("data_type").collect()(0)(0).toString() val return_code = "hadoop distcp "+sample_loc+" "+sample_bkp_loc+""! if(return_code == 0){ print("Overwrite complete") } else {print("Overwrite Failed")} spark.sql("analyze table sample_07_bkp compute statistics")
Target table: sample_07_bkp
Source table: sample_07
Good to run analyze table on Target table after this process in order to sync metastore with file part information. Else without this command you may not see proper data in the table.
From running a query which took ~40 mins via SQL I was able to come to ~20 sec with 2-3 lines of scala code by handling it programmatically.
With parquet snappy I was getting checksum mismatch errors. So to overcome it and speed up the copy process I ran truncate on Target table first. Then used -update -skipcrccheck option to complete it successfully.
val return_code = "hadoop distcp -update -skipcrccheck "+sample_loc+" "+sample_bkp_loc+""!
Let me know your experience if you have tried this.