Apache Hive

Hive Transactional Tables: Everything you must know (Part 1)

We all know HDFS does not support random deletes, updates. With HIVE ACID properties enabled, we can directly run UPDATE/DELETE on HIVE tables. Hive ACID tables support UPDATE, DELETE, INSERT, MERGE query constructs with some limitations and we will talk about that too. I am using HDP 2.6 & Hive 1.2 for examples mentioned below.

Points to consider while using Hive Transactional Tables:

1) Only ORC storage format is supported presently.
2) Table must have CLUSTERED BY column
3) Table properties must have : “transactional”=”true”
4) External tables cannot be transactional.
5) Transactional tables cannot be read by non ACID session.
6) Table cannot be loaded using “LOAD DATA…” command.
7) Once table is created as transactional , it cannot be converted to non-ACID afterwards.

Following properties must be set at Client Side to use transactional tables:

1) set hive.support.concurrency = true;
2) set hive.enforce.bucketing = true;
3) set hive.exec.dynamic.partition.mode = nonstrict;
4) set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

Also make sure that you are using TEZ as execution engine as MR does not support ACID transactions.

set hive.execution.engine=tez;

Let’s begin with creating a transactional table:

Step 1: Create a Transaction table

CREATE TABLE usa_prez_tx(
 pres_id tinyint,
 pres_name string, 
 pres_dob date, 
 pres_bp string, 
 pres_bs string, 
 pres_in date, 
 pres_out date)
 CLUSTERED BY (pres_bs) INTO 4 BUCKETS 
 STORED AS ORC
 TBLPROPERTIES ("transactional"="true")
 ;

Step 2: Load data into Transactional table

insert into usa_prez_tx select * from usa_president;

The number of buckets you will specify will be the maximum number of file parts that shall be generated in the output. Hence you can see that number of reducers in your job. In this case, 4 file parts will be created. Since we have defined table as transactional HIVE will keep “delta” and “base” versions of file. If we will check in HDFS we will see something like this:

hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004 

Also you cannot load data file directly into transactional tables. The reason is the file generally is in TEXT format and table is in ORC format. Hence you will get below error :

“The file that you are trying to load does not match the file format of the destination table. Destination table is stored as ORC but the file being loaded is not a valid ORC file.”

Now we have loaded data into table. Let’s try to run some DELETE statement.

Step 3: DELETE some data from transactional table

DELETE from usa_prez_tx where pres_bs='Virginia';

8 records are deleted in the table and table will have now 37 records. HDFS directory will look like:

hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000/bucket_00002  

You can see an extra delta directory is created for bucket value “bucket_00002” which gives an impression that few records are deleted from existing “bucket_00002”.

Step 4: UPDATE data in transactional table

update usa_prez_tx set pres_out='2999-12-31' where pres_out is NULL;

1 record updated. HDFS directory will look like:

hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000/bucket_00000 

So you can see for each operation a delta directory is created and is maintained by hive metastore.

Step 5: MERGE data in transactional table

MERGE into usa_prez_tx as tb1 
 using usa_president as tb2 
 on tb1.pres_name = tb2.pres_name and tb1.pres_in = tb2.pres_in
 WHEN MATCHED THEN 
UPDATE set pres_out = tb2.pres_out
 WHEN NOT MATCHED THEN 
INSERT VALUES (tb2.pres_id,tb2.pres_name,tb2.pres_dob,tb2.pres_bp,tb2.pres_bs,tb2.pres_in,tb2.pres_out);

HDFS directory will look like:

hdfs dfs -ls -R /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00001
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000001_0000001_0000/bucket_00004
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000002_0000002_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000003_0000003_0000/bucket_00000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00000
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00001
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00002
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0000/bucket_00004
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0001
 /apps/hive/warehouse/db_hiveql.db/usa_prez_tx/delta_0000004_0000004_0001/bucket_00002 

So you can see for every transaction, a delta directory is created which tracks the changes. In the next post, we will see what are the limitations with transactional tables.

5 thoughts on “Hive Transactional Tables: Everything you must know (Part 1)”

  1. Hi, I need to use “Warehouse Connector Interfaces” to update an Hive ORC table from Spark. I want to use Merge statement , is this possible to merge from a hive external table to orc table via spark?

Leave a Reply