Sunday, December 22, 2013

Apache Flume – An Introduction


Enough reading, I tried to build a video blog. It’s my first attempt to create a video, please check it and provide your feedback.
I planned a series of four sessions to introduce basics of Apache Flume, this is first of them.

Monday, December 16, 2013

Apache Sqoop – An Introduction

Hadoop as a platform gives us a framework to interact with data in parallel using powerful MapReduce framework. Hive and Pig gives us a high level tool to perform analysis and transformations very efficiently for very large set of data. But for doing all this we need data to be available in HDFS. Data comes from various sources and most of our traditional data is available in our RDBS systems. We need to bring it into HDFS to perform analysis and we may further need to send final results or transformed data back to RDBMS to take advantage of RDBMS capabilities.

There are various ways to move data back and forth between RDBMS and HDFS. You can write scripts or small programs using many of the available technologies to extract data from your RDBMS and vice versa. But this will be a repetitive task and you may have to handle various challenges like connectivity, native RDBMS syntax and efficiency. This gave a motivation for an Open Source tool to simplify this process and answer was Sqoop. Sqoop is a short form of SQL to Hadoop.

It is a simple command line tool till Sqoop 1.4.4 and Sqoop2 is bringing web interface as well. You can do a lot using Sqoop. For example you can import (RDBMS to HDFS) one table, Import all tables, use SQLs to import data and incremental imports etc. You can export (HDFS to RDBMS) data, update an existing set, merge i.e. update and insert both together, you can use stored procedure instead of insert statements etc. But the bottom line is, Sqoop is a tool to handle data movements between RDBMS and HDFS.
We will see some simple examples to get started on Sqoop. For demonstration, I am using HortonWorks Sandbox 2.0. If you want to play yourself and don’t have any hadoop environment, you can get sandbox. Steps to setup this box are shared here.
HortonWorks Sandbox comes with MySQL preinstalled into it. So let’s use it to create one table and load data from that table to HDFS.
As shown in below screen, once you logged in to sandbox, just type mysql and you will get MySQL prompt.
At MySQL prompt, I typed show databases; command and it shows that we have 2 databases. Let’s not touch information_schema. We will use test database as shown in next statement. This database does not have any table so we have to create one sample table.
As shown in below screen, I have created one table named mysql_data and inserted two records in this table.



We will import this table into HDFS. Size of the data for my example is very small but it’s good enough to show how Sqoop works. Sandbox comes with all basic drivers and connectors for Sqoop to connect with MySQL hence we do not need anything extra. We are ready to trigger Sqoop command as shown below.
Above command is the simplest form of Sqoop command to import data from MySQL database to HDFS file system. Import is the sub command of Sqoop which instructs Sqoop to start import into HDFS. Next part is --connect which specifies JDBC connection string. Finally, we specify table name to be imported. Let’s first check what is created by this command before we get into further details.
Above command once executed will create a new directory into my home directory. Name of this new directory will be same as table name. Let’s check what is created in this directory.
Sqoop has created two files each contains one record from the table. That’s why I mentioned that my test data is very small but it is good enough to explain what is happening inside Sqoop. Sqoop generates a java program to move data from MySQL into HDFS. This java program is nothing but a Mapper program, which creates multiple map jobs to move data in parallel. In this case there were only two mappers each created a separate file in same directory. We will discuss it again in more details.
Let’s execute another example to import data from Oracle. If you already have an Oracle instance you can use it. If you do not have any oracle instance, I have detailed out steps to install Oracle XE into your HortonWorks sand box.
Download oracle Xpress edition from below link.
You need to download Oracle XE for Linux x64 as shown in below screen shot.
You will get a zip file which you need to move to your Linux VM. I used WinSCP to move data from my laptop to Linux VM. You can use same or any other tool of your choice.
After copying your compressed installer into your VM, you need to unzip it as shown in below screen. It will create a new Directory Disk1 and various other sub directories inside it.
Change your present working directory to Disk1 and execute your installer using rpm -ivh. Check below screen if you have any doubts.
I performed all activities as root user. Once rpm is complete, Your Installation is done. You will get a message to execute oracle-xe configure as shown in above screen to complete post installation steps. Go ahead and execute it. This will configure Oracle XE and will ask you few questions like port numbers and passwords. Just accept all default port numbers and give an appropriate password.
It may take 10-15 minutes to complete configuration. Once configuration is complete, you need to set your necessary environment variables. Oracle XE provides a script oracle_env.sh to setup necessary environment variables. This script is placed in /u01/app/oracle/product/11.2.0/xe/bin directory. you can copy this script into /etc/profile.d directory so that it is automatically executed whenever any user logs into to system.
We are done with Oracle XE installation. Let’s logout root user and login using your working credential. Check below screen, after login I executed sqlplus system command to login to my oracle XE database using system user account. I gave password which I supplied during installation and finally I got SQL prompt. 
 
Next screen shows command to create a new user with password hadoop. I gave DBA role to this use.
Now, I will connect to oracle XE again using pkp/hadoop credentials and I will create a sample table then I will insert two records in this table. Commands to do all this is shown in below screen.
At this stage, my source table is setup and I am ready to import data from Oracle to HDFS. But I have to get JDBC driver for oracle. JDBC driver for MySQL was already present in sand box so i did not performed this step earlier but JDBC driver for Oracle is not present in my sand box. JDBC driver for oracle is freely available for anyone to download. You can download JDBC driver for Oracle from here and place it into Sqoop library which is /usr/lib/sqoop/lib. Below screen shows that I have already placed ojdbc5.jar at appropriate place.
We are ready to execute Sqoop command for import. Command to complete import activity is shown below. You will notice that JDBC connection string is now changed to reflect Oracle specific connection string.
I had to provide username and password also because Oracle does not allow me to access with appropriate credentials. You will notice that I gave one extra parameter –m 1. what is that? This option tells Sqoop to create only one Mapper as we know our table is very small and we don’t need multiple mappers to do import in parallel.
As expected, above Sqoop command will again create a directory as shown below. You will notice that this time we have only one file created by Sqoop which contains both the records from the table. This happened because we restricted Sqoop to create only one mapper.
We have seen something in action, now it’s time to understand how Sqoop Import works at high level. It’s a two-step process.
  1. Sqoop connects to RDBMS to gather necessary metadata information about the data being imported. In my examples above, I just mentioned table name to Sqoop but to accomplish this task, Sqoop need to know column names, their data types and size etc. That’s what a metadata Sqoop has to gather from RDBMS in first step.
  2. Second step is to submit multiple map only hadoop jobs that Sqoop will submit to your hadoop cluster. These jobs are responsible to perform actual data transfers. These are java mapper jobs which are already aware with metadata and source RDBMS. They connect to RDBMS and copy their part of data into a separate file in HDFS. By default these files are comma delimited fields and rows are new line terminated. But you can easily override these defaults. In fact, we can directly move data from RDBMS to Hive or HBASE tables.
It’s just beginning of Sqoop and there are lots of things to learn Sqoop. Sqoop 2 is coming with more powers and flexibility. If you are interested here is a fantastic video about Sqoop 2 features directly from Sqoop developers.
If you followed this article, you have an environment where you can play with your Sqoop commands and options, keep learning.


Tuesday, December 10, 2013

Oracle SQL - Analytical Functions

Why?
Why do I need to write about analytical functions?
I am getting requests to write about analytical functions in Oracle SQL from several people. Many great articles and explanations are already available about them but I find that very few people understand them and use the power they deliver. During my recent recruitment drive, I found a lot of people who are writing SQLs since 4-5 years but they haven’t used single analytical functions. Some people have actually used 2-3 of them but they don’t understand them properly and just remember few specific problem they solved using them.


To be efficient in delivering technology solutions, you must know and understand what tools are available to you and what kind of problem they solve. If you are unaware of those tools, you may still be able to solve your problem but that may not be an optimal solution and may incur unnecessary extra effort time and cost. Often I find people in my own team showing me their solutions with proud expecting some appreciation as they think they solved a unique problem in a unique way. Unfortunately I may have to break their heart telling them that there are much more easy solutions which you may have achieved in less than 5% of time and more than 100% efficiency. Analytical functions provided by Oracle SQLs are one of such tools you must understand properly to avoid such mistakes.

Start with Aggregation Basics
To explain analytical functions, I have to start with aggregate functions. Aggregate functions are easy to understand and almost every SQL developer uses them often. Some of the most common aggregate functions are COUNT (), AVG (), MAX (), MIN () etc. Have you not used something like below SQLs?



If you haven’t seen such SQL, step back and learn aggregate functions before you want to learn analytical function.
Let’s look at some of the above SQLs and their result. I will use Oracle Xpress edition which comes with preloaded HR schema. If you want to play with it yourself, you can get it free from OTN and install it on your machine in less than 10 minutes.
Let’s look at the first SQL and its output.



What do you get as output? I got just one row telling me total number of records in the table. Isn’t it correct? That’s what I wanted.
Let’s look at the second SQL.



I get 19 rows each of them telling me total number of records for each unique JOB_ID. That’s again correct. Count () function gives me total number of rows and GROUP BY restricts count () function to work only on subset of entire table.

Aggregation is loss of details
But do you notice a loss of details in these query results. Aggregate functions are fantastic but they hide entire details behind them. I had 107 records in my table and several columns. I wanted to count employees without losing any details. My expectation was a result something like shown below. I need count as highlighted in red, but I want all rows and columns as well. How do I get that?



That’s where analytical functions play their role. I can get above results using simplest form of analytical function. Above results are generated by analytical query shown below.

 

Partition By – When do we need it?
What about grouping problem? I want to count number of employees sharing same job_id without losing any details. We have seen similar results using aggregate function and group by. But in that solution, I lost all the rows and columns.
Grouping problem is solved using PARTITION BY clause of analytical functions. To get answer for “count of employees sharing same job_id” we can partition data by job_id, below screen shown how to do this.



You can see that my data is partitioned on JOB_ID and count () function is restricted within each partition. This is exactly same what we get using GROUP BY on aggregate functions. But I haven’t lost any details. You can extend this partitioning to multiple columns similar to what you do in GROUP BY multiple columns.
You may ask me a question. Why do I need to have my entire data and still have some aggregates calculated on it?
I have seen several people writing complex SQLs and get their results saved in MS Excel. We often get requests from business users to get them data into MS Excel. Then they perform analysis on that data in Excel by adding additional columns. That’s exactly what analytical functions will do for us.
To summarize whatever we discussed till this point “Whenever you need to get aggregates on result set of your query without losing anything from your result, you can start thinking of analytical functions.”

Order By – When do we need it?
Let’s pick one partition from the above example.
In this partition, I have five employees and they all share same job title “FI_ACCOUNT” but their salaries are different. Luis is least paid and Daniel is highest paid. I need to calculate delta salaries for all employees compared with the highest paid employee.
Difficult, isn’t it? I see a simple 3 step logic to get this for each partition.
  1. Order by on salary in decreasing order.
  2. Pick the first value which will be the highest salary.
  3. Calculate difference between highest salary with each employee salaries.
Simple, isn’t it? Only thing you need to know is how to perform each of the above steps using analytical function.
  1. You can extend PARTITION BY with ORDER BY.
  2. Instead of COUNT (), you can use FIRST_VALUE () function.
  3. Calculating difference is simplest and I don’t need to tell you that.
Let’s look at the SQL and results. Highlighted part of SQL will get you highest salary within each partition. Rest is just a calculation.



We have seen combined effect of ORDER BY and FIRST_VALUE. You must understand individual work done by them. ORDER BY will just order my data within partition and FIRST_VALUE will pick the top most record. We did an ORDER BY DESC so first record is highest salary and FIRST_VALUE function picks that. But if you do an order by ASC, first record will be lowest salary and FIRST_VALUE will pick lowest one.
You can use ORDER BY without PARTITION BY if you want delta salaries across all employees. Try it yourself.

Order By – careful, cautious
It is important to note that behavior of specific function might be different when they are used with ORDER BY clause. You have to understand specific behavior of an aggregate function which you use with ORDER BY clause. Lack of understanding on how function will behave when used by ORDER BY will cause confusion. Let’s take an example of Count () function and I bet you will be surprised and confused. 

 

  1. Highlighted in Green- Count () is showing cumulative counts so first record is 1, second records counts first and second hence 2 and so on.
  2. Highlighted in Red- Both are showing 2. Why behavior is different than other partition which is highlighted in green. Because we have ordered by salary and there is a tie on salary.
If you don’t understand this behavior, don’t worry about it as I will explain it again and just follow principle of not having a gun if you don’t need it. I mean, don’t use ORDER BY unless and until order is important for you. When I want to calculate count, order within a partition is not relevant for me, so I will never use order by while calculating count. Whereas while finding maximum salary, order was important for me because I wanted to take first value after ordering it so I used order by.

Order By – Ensure deterministic ordering
Sometimes you need an ORDER BY since it is important but your function’s behavior is giving you a trouble. Let’s take an example. You need a running total for employee salaries in order of first_name. Getting a running total is not possible unless we have a defined order of records. SQL and results for this is shown below.



Above results looks perfect. Let me change the question slightly. We need a running total for employee salaries in order of increasing salaries. As expected, I just changed order by from first_name to salary. New SQL and results are shown below.



It’s broken caused by equal salaries. That’s what I wanted to highlight that you need to be careful when using order by in analytical queries. Now let’s understand why both of these are showing same value. We did an order by salary. Salaries for Steven and Hazel are same and Oracle is not sure which records it should apply SUM () function first. My Order by clause is not able to ensure deterministic ordering. In that case analytical function is applied on both of them and same value is returned for both.
How can we fix above problem. We know the reason why it happened, so let’s fix that reason. If I add another column in order by, I will be able to ensure deterministic ordering. Solution is shown below.




Windowing
We understood that PARTITION BY clause creates a work window for analytical functions. ORDER BY clause will order records in that work window. Finally analytical functions are applied on all records within work window starting from first record, second record, and third record and so on. What does that mean? Let’s look at one example to discover what I am trying to explain.
Check below example which shows running total of salaries in order of salaries and first_name. It is same example we used earlier with same results but with additional RANGE clause in the query.



Do you notice RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW? This statement directs SUM () function about the window. It defines window from first record (UNBOUNDED PRECEDING) till current row. This is default windowing clause of analytical functions that’s why even if you omit it, your query behaves exactly same.
Above explanation gives us a hint that we can play with windowing clause to redefine our window which by default starts at first row and ends at current row. You have two options RANGE BETWEEN <Start> AND <End> and ROWS BETWEEN <Start> AND <End>. RANGE defines a logical window and ROWS define a physical window. We will see examples for both types but before that we need to understand how we define start and end of the window.
We have following methods to define start.
  1. Starts at the first row of the partition* - UNBOUNDED PRECEDING
  2. Start at the nth row above current row - value_expr PRECEDING
  3. Start at the current row - CURRENT ROW
  4. Start at nth row after the current row - value_expr FOLLOWING
*-Note that if you don’t have any partition, then entire set is considered as single partition.
We have following methods to define end.
  1. End at the last row of the partition - UNBOUNDED FOLLOWING
  2. End at the nth row after current row - value_expr FOLLOWING
  3. End at the current row - CURRENT ROW
  4. End at the nth row above current row- value_expr PRECEDING
You must take care that your start must evaluate before your end. This means start at CURRENT ROW and end at one row before current row (1 PRECEDING) or any such combination is not valid.
Now it’s time for an example. We will first look at an example for ROWS BETWEEN because understanding physical rows is easy compared to logical rows. You can see below example that I am able to access my previous and next physical rows for each current row. This kind of access is almost impossible to think and achieve without doing a loop using a procedural language like PL/SQL.



Let’s look at the logical RANGE example.



Let’s first look at the previous_row. For Luis, previous_row window starts at itself because there is no record before it and 1000 PRECEDING has no meaning. We took first value hence it is 6900. But for Jose Manuel, previous_row window start from a salary which is in rage of 1000 (7800-1000=6800) which is again first record because 6900 is in range of 6800 to 7800. We took first value hence it is 6900.
Now we look at next_row. For Luis, next_row starts at current row and ends at a salary which is in rage of 1000 (6900+1000=7900) which is 7800 because 7800 is in rage of 6900 to 7900. Note that next record shows 8200 which is beyond the range. We took last value hence it is 7800.
I hope you get the meaning of logical range. It could be numeric value if you order by a numeric column. It could be date value or a numeric value if you order by a date column. If you order by a date column and numeric value 365 will create a range of one year.
Analytic functions are commonly used in data warehousing environments and are one of the most powerful tools in hand. It’s a difficult topic to understand so try it yourself and play with it for some time if you really want to grasp all of it.
Good luck.


Thursday, December 5, 2013

Pig Schema – Part 2

In my previous post, we learned Pig scalar data types and complex data types. We have seen examples to load a tuple. We have seen another example where we loaded a tuple which contains a Map as a field. That was an example of nested complex data type.
We are left with an example to load an inner bag into a pig relation. What is inner bag? We understand bag, it’s a collection of tuples. So whenever we load data from our files, it creates several tuples and all of them are loaded in an outer bag. Look at the example below. Line 1 & 2 loads my file test-data.tsv into a relation named D. In this case D is an outer bag. Line number 3 creates a new relation X by grouping state column from D. In this case X is again an outer bag. At line number 5, when we dump X, content of X is displayed on screen. Look at the last line in output.
First column value is Maharashtra and second column is enclosed into {}. That entire second column is an inner bag. This inner bag holds two tuples which are separated by comma. I have saved it into a file and we will load it back using a new script.

Look at the example below to learn how we load an inner bag from our file. It’s simple and need not any explanation.
I will summarize Complex data types in below table.
We have seen that when we load data from a source file, data format in source format is also important. When we loaded map data, we had data encoded in [key#value] format. When we loaded an inner bag, we had data encoded in {(f1,f2,…),(f1,f2,…)} format.
You must have a doubt that do we really get data in such a very well encoded pig schema notations? Real life data comes in all varieties. You may get data files encoded in pig schema notations but you may get completely unstructured data or semi structured data or you may get very well structured data but not in pig schema notations, it may be some other schema notation like JSON or XML or an HBASE table or any other format. That’s one of the top challenges in data analysis and any tool claiming to be a data analysis tool must have to deal with those challenges. So the question is how pig handles this challenge.
This role is played by LOAD and STORE function. Load/Store functions determine how data goes into Pig and comes out of Pig. If you remember examples which we used to load a comma separated file, we used PigStotage function to inform pig that my data file is a comma separated file. PigStorage is default load function for pig. As per Pig 0.12 documentation, there are 8 such functions to support various source data formats. We will see some examples on their usage but what if you have data which is in a format other than supported by these 8 functions. You can write your own function and use it in your pig script to load your custom formatted data. But if your data is in any of the industry accepted standard formats or your schema is stored in HCATLOG, you will find such Load/Store functions already developed and shared by someone in open source community. I will write about HCATLOG separately but if you need you can get information about HCatLoader here.
Now it’s time for an example of using load function other than default PigStorage. I will show you an example on how to use JsonLoader to load data from a JSON data file.
For this example, we need a simple Json data file shown as below.
JsonLoader expects a schema definition file named as .pig_schema in the input directory. Screen below shows content of the file for our example.
Finally, below script shows my script for loading Json formatted data.
Keep reading, keep learning.

Good Luck.