We all know HDFS does not support random deletes, updates. With HIVE ACID properties enabled, we can directly run UPDATE/DELETE on HIVE tables. Hive ACID tables support UPDATE, DELETE, INSERT, MERGE query constructs with some limitations and we will talk about that too. I am using HDP 2.6 & Hive 1.2 for examples mentioned below.
Points to consider while using Hive Transactional Tables:
1) Only ORC storage format is supported presently.
2) Table must have CLUSTERED BY column
3) Table properties must have : “transactional”=”true”
4) External tables cannot be transactional.
5) Transactional tables cannot be read by non ACID session.
6) Table cannot be loaded using “LOAD DATA…” command.
7) Once table is created as transactional , it cannot be converted to non-ACID afterwards.
Following properties must be set at Client Side to use transactional tables:
1) set hive.support.concurrency = true;
2) set hive.enforce.bucketing = true;
3) set hive.exec.dynamic.partition.mode = nonstrict;
4) set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
Also make sure that you are using TEZ as execution engine as MR does not support ACID transactions.
set hive.execution.engine=tez;
Let’s begin with creating a transactional table:
Step 1: Create a Transaction table
CREATE TABLE usa_prez_tx( pres_id tinyint, pres_name string, pres_dob date, pres_bp string, pres_bs string, pres_in date, pres_out date) CLUSTERED BY (pres_bs) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES ("transactional"="true") ;
Step 2: Load data into Transactional table
insert into usa_prez_tx select * from usa_president;
The number of buckets you will specify will be the maximum number of file parts that shall be generated in the output. Hence you can see that number of reducers in your job. In this case, 4 file parts will be created. Since we have defined table as transactional HIVE will keep “delta” and “base” versions of file. If we will check in HDFS we will see something like this:
hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/ /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004
Also you cannot load data file directly into transactional tables. The reason is the file generally is in TEXT format and table is in ORC format. Hence you will get below error :
“The file that you are trying to load does not match the file format of the destination table. Destination table is stored as ORC but the file being loaded is not a valid ORC file.”
Now we have loaded data into table. Let’s try to run some DELETE statement.
Step 3: DELETE some data from transactional table
DELETE from usa_prez_tx where pres_bs='Virginia';
8 records are deleted in the table and table will have now 37 records. HDFS directory will look like:
hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/ /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000/bucket_00002
You can see an extra delta directory is created for bucket value “bucket_00002” which gives an impression that few records are deleted from existing “bucket_00002”.
Step 4: UPDATE data in transactional table
update usa_prez_tx set pres_out='2999-12-31' where pres_out is NULL;
1 record updated. HDFS directory will look like:
hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/ /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000/bucket_00000
So you can see for each operation a delta directory is created and is maintained by hive metastore.
Step 5: MERGE data in transactional table
MERGE into usa_prez_tx as tb1 using usa_president as tb2 on tb1.pres_name = tb2.pres_name and tb1.pres_in = tb2.pres_in WHEN MATCHED THEN UPDATE set pres_out = tb2.pres_out WHEN NOT MATCHED THEN INSERT VALUES (tb2.pres_id,tb2.pres_name,tb2.pres_dob,tb2.pres_bp,tb2.pres_bs,tb2.pres_in,tb2.pres_out);
HDFS directory will look like:
hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/ /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000/bucket_00000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00000 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00001 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00002 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00004 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0001 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0001/bucket_00002
So you can see for every transaction, a delta directory is created which tracks the changes. In the next post, we will see what are the limitations with transactional tables.
Clear concepts presented very efficiently.
Thanks for a great read
Thank you Saksham for appreciation.
Hi, I need to use “Warehouse Connector Interfaces” to update an Hive ORC table from Spark. I want to use Merge statement , is this possible to merge from a hive external table to orc table via spark?
Hi Sachi
I am yet to use warehouse connector however I will give it a try and will share my observation soon. Also I could find some information related to your query in the below mentioned link:
https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.1.0/integrating-hive/content/hive_configure_a_spark_hive_connection.html
—
Nitin
are you able to achieve this