Wednesday, November 20, 2013

Loading data into hive Tables

Want to read offline? Get pdf.

One important limitation in hive is that it does not support row-level insert, update, and delete operations. So the only way to load data into a table is to use one of the bulk load methods or simply write files in the correct directories.
By now you learned how to create tables in hive and these tables may be managed tables or external table. We have also seen that managed and external both could be partitioned. We learned some methods of loading data into these tables. For managed table whether it is partitioned or not, we used LOAD DATA LOCAL statement. For external table if it is not partitioned, we used LOCATION clause in create table itself. When external table was partitioned, we used ALTER TABLE to add new partitions to our external table. I just wanted to summarize data loading methods at one place hence this post.
We know LOAD DATA LOCAL statement can be used to load data into managed table. If table is partitioned, you will use PARTITION clause also otherwise it is not required. Let’s relook at the statement we used in one of my previous posts. I have placed same screen again here.

Few things need to be added here.
  1. You already know that LOAD DATA will create appropriate target directory in hive warehouse and copy data file there. If directory already exist, hive will not create it. File name remains same.
  2. Hive does not perform any verification on data for compliance with table schema.
  3. If LOCAL keyword is used, source path is assumed in local file system. If it is omitted, path is assumed to be in HDFS. There is a catch here. Without LOCAL, hive will move data file instead of copying like in case of LOCAL. This is because hive assumes that you do not want duplicate copies in HDFS. If you don’t want to move it, use it there itself using external table method.
  4. ‘/home/pkp/bang-data.txt’ is a file name but by convention, we provide directory name which will allow hive to copy all files available in the directory at once. We already know that file names remains same when hive moves them into warehouse.
  5. If files which are being copied by hive already exist in the target directory, they will be overwritten but all new files which does not exists will be copied. If want to clean target directory before moving any data you can use OVERWRITE INTO TABLE in second line of above screen shot.
  6. We already know we can use PARTITION clause in LOAD DATA statement. But example we saw was for just one partition. This doesn’t make it very useful for us. We learned ALTER TABLE command earlier to add new partitions to external table. But ALTER TABLE is equally valid for managed table as well. So if we have 10 partitions to add, we can use single ALTER TABLE command to add all partitions at once as explained in previous post.
This completes some missing bits of what we already learned about loading data into hive. You might have noticed by now that I am not behind explaining syntax or covering all details of options available for various commands. That’s available on Apache hive wiki. We are running behind concepts and key capabilities of this tool called Apache Hive.
Let’s learn new method for loading data into hive tables.
In previous post, I mentioned external tables. They are mostly used by mapping them into hive. Once mapped, we execute our queries on them, prepare report and once done, we un-map them from hive using drop table statement. In that case, hive doesn’t remove data but only removed metadata. There might be requirements when we want to load data from those external tables into hive tables. This is normally required during ETL process when we want to transform staged data from an external table using hive query language and load them into hive table in new structure. It is not necessary that we do it using external tables. We may also need to transform a managed table into a new structure and store it as a new table. The question here is, does hive provide any method to load data from and existing table (managed or external) using hive select statement? And answer is yes. Let’s demonstrate this. We will use simple select * statement in this demonstration without transformation to keep focus on loading method instead of complicating it with transformation logic. In real life scenario, we may not need such simple loads without transformation. This transformation may be anything and using simple aggregation or some complex logic.
Let’s start and look at the data which I prepared for this demonstration. I have a plain text file of employee records with comma separated fields as listed below.
employee_id, first_name, last_name, email, hire_date, salary, department_name, country_abr, state_province, c.city, c.street_address, c.postal_code.
There are 105 records in this file. We will have a special focus on country and state because I want to demonstrate partitioning on these two columns. Country and state wise number of records in my file are shown in below table. I hope you can prepare such files easiyl but if you need to use my file, it is shared
here.
Country Abbreviation
State
Number of records
US
Texas
5
US
Washington
18
US
California
45
CA
Ontario
2
UK
Oxford
34
DE
Bavaria
1
I have moved my data files (hr-data.txt) into HDFS directory /user/pkp/hr as shown in below screen.


I next step, I created an external table as shown in blow screen and queried first record to test if my table is created correctly and records are mapped properly into fields. We know hive does not check da with table schema and will read whatever is there in data file. So, it is almost always necessary for you to test if table schema and data are in compliance.

Before we load data into hive table, let’s create a hive table. We will name it as emphive and keep structure same as we are not doing any transformation. But unfortunately we have to remove country and state columns from our hive table because we want to partition our table on these columns. We already know that hive will append partition columns at the end. They are not actual columns but just virtual columns deduced by partitioning metadata. It is good in a sense that it removes unnecessary data from file. So let’s create emphive table with same structure excluding country and state column from table schema and we place them into partition clause. You have already learned it, but just for completeness, I added a screenshot below as a reference.

Ground work is done. Let’s load data into emphive from external table employees. We will load single partition for now. Command used for this is shown below.

It throws lot of messages and you find looking on those messages that it runs 3 MapReduce jobs to accomplish this task. Finally data is loaded in our tables, lets query records from newly loaded table. You find in screen below that only one record is returned which is as expected. You should also notice that returned records shows country and state columns as last two columns.


With
OVERWRITE, any previous contents of the partition (or whole table if not partitioned) are replaced. If you do not use OVERWRITE and replace it with INTO, Hive appends the data rather than replacing it.
Let’s load data for two more partitions, but how? Do I have to write INSERT statement twice? No, there is another way to do it, using this method we will read source table once and write into as many partitions as you want. Let’s see how it works in below screen.


Above statement will again spawn multiple MapReduce jobs and throws several messages before it completes. Let’s query our emphive table again to check what is there now.
It now shows 8 records as expected. Five records for US/Texas, two for CA/Ontario which we just loaded and one record for DE/Bavaria which we loaded earlier.
Above statement will read all records from the source table named employees, then each record will be evaluated with each SELECT … WHERE … clause. If a record satisfied a given SELECT … WHERE … clause, it gets written to the specified table and partition. This statement is more powerful than it appears, each INSERT clause is independent as it can insert into a different table and some of them may be partitioned while other not. Try it yourself.
Let’s move further and learn dynamic partitioning. In my external table, I had six combinations of country/state. If I want to load all of the data into their respective partitions of my hive table, I have to write a long statement. This situation could be more daunting if I have a file with 100 such combinations resulting into several partitions. That’s where dynamic partitioning comes to rescue.
Dynamic partitioning in hive is controlled by configuration variables. I have used hive SET command to display current values of these variables.


All of above variables are briefly explained into table below. Their default values are alredy displayed in above screen.
Variable Name
Description
hive.exec.dynamic.partition
true/false for enable/disable dynamic partitioning
hive.exec.dynamic.partition.mode
strict/nonstrict as explained below.
hive.exec.max.dynamic.partitions.pernode
Max number of partitions each mapper/reducer can create.
hive.exec.max.dynamic.partitions
Max number of partitions allowed by single statement.
hive.exec.max.created.files
Max number of files that can be created globally.


All of the above variables are there to protect system to burnout by a faulty code. In my environment, hive.exec.dynamic.partition is already true so I should be able to create dynamic partitions but hive.exec.dynamic.partition.mode is strict. This will impose some restrictions on us (we will see it later) so let’s change it to nonstrict using below command.SET hive.exec.dynamic.partition.mode=nonstrict;
Now, we are ready to demonstrate dynamic partitioning in hive. Before we start, let me warn for a catch here. I will explain it later after I explain dynamic partitioning shown in below screen. Let’s look at the below screen.

Above statement will several things.
  1. Remove all data from my table emphive caused by OVERWRITE clause.
  2. Read all records from source table employees caused by SELECT …FROM.
  3. Create a new directory to represent a new partition/sub-partition for every country/state if directory does not already exist. This effect is caused by PARTITON clause.
  4. Create a new file to hold data for specific partition inside appropriate directory. This is first time we see hive creating file otherwise, it just used to copy file.
  5. Insert data into appropriate file.
You can go back to HDFS file system and check if got 4 directories created for four countries we have in our example data file. You can further check your country directories and you will find sub directories created for each state inside country. You may execute some select statements with country and state conditions in where clause to test your work.
Let’s talk about that catch I mentioned. Did you already notice that? If yes, you are getting hive now. Any ways, catch is those last two columns country, state in my screen above. Those are necessary at the end in specific order. You can exclude them or change their order. Hive assumes, partitioned columns ate the end in your select statement in same order as you have specified it in your partition clause.
One last thing about dynamic partitioning I need to mention is why we changed to non-strict mode using set hive.exec.dynamic.partition.mode=nonstrict; command.
Strict mode puts a restriction on us. In this mode, we can’t use full dynamic partitioning. What does that mean? Which means I am forced to use at least one partition parameter as static? Let’s look at the new statement if I have to work in strict mode.

You can notice difference. I am forced to make three changes.
  1. In partition clause, I must specify at least one leading partition value as I specified country=’US’.
  2. As a result, I had to remove country column from select list as hive assumes all record to be for country=’US’.
  3. Further I am forced to place a where clause in my select statement to restrict it for US only because hive assumes all records to be for US.
This ends my discussion about loading data into hive table.