Hive

I will give a basic overview of what Hive is and we will get started with learning the different types of tables and queries in hive. The queries are close to SQL and they were meant to be close to SQL as it was intended to make an abstraction layer for better use of map reduce.

Hive is a Data Warehouse which sits on top of Hadoop file system. Here we have a schema which we give to the data residing in the HDFS. These schema are saved in the meta stores. By default we have a Derby database to save these schema. If we want to use multiple sessions, then we have a MySQL DB as a meta store. In background the data is in different files supported by HDFS. So, when we create a table, we just create a schema for that data residing in the HDFS. That is why this named as Schema on read. We use the Schema which we built to read the data stored in the files.

Now, when we run queries, we are actually running map reduce jobs on the background. The queries are converted to map reduce jobs by hive. So hive is an abstraction on top of map reduce jobs. You can also change the background from running the map reduce to spark jobs by changing the configuration. The above concepts should be sufficient for you to get started to write the queries and create tables. The other minor nuances you will learn while reading the rest of the article and writing queries.

Creating Databases and Tables-

You can create databases in the hive similar to how we do in SQL systems. After you create a database, all the tables will actually be folders under the database folder.

>show databases;
>create database if not exists db1 comment “this is db1”;
>describe database db1;
–if you use the keyword Extended, you will get all the dbproperties attached to that database
>describe database db1 extended db1;

Once you create a database, you need to specify which database needs to be used

>use db1;

If you think there are other databases which you might want to use, you can use show command to see all the databases. Now whichever database you have used, your activities will be applicable to this database.

>create table if not exists table_name(col1 string, col2 array, col3 int, col4 string) row format delimited fields terminated by ‘,’ collection items terminated by ‘:’ lines terminated by ‘\n’ stored as textfile location ‘/user/munjesh/table_test’;

The above line will create a table with name table_name(only if it does not exist) with 4 columns whose data types are mentioned already. If you take a look at the column 2, that is a bit different and is an array of string type. So, now as you have specified the row format delimited, the table will be in rows. Next we specified the fields terminated by comma, so all the columns will be terminated by a comma. Next the collection items terminated by colon, so the array will have your values separated by colon. The lines will be terminated by new line characters. So, whenever it finds a new line, it will consider it as a new record. The data is stored as a textfile. There are many other types in which we can store the data, which we will discuss later.

>describe formatted table_name;
–this will provide the details of the table including where it resides over the hdfs.

Loading Data

Loading data into the table can also be done from command line. We have the option to either append the data or overwrite as per our requirement. We can also tell if we want to load data from our local or from HDFS.

>load data local inpath ‘/home/munjesh/data.txt’ into table table1;
#The above statement uses word local which tells that we want to load the data from our local system rather than HDFS.
#Then we specify the word ‘into’ which means we want to append the data to the existing data. If want to overwrite the data, we can use the word ‘overwrite’ in place of ‘into’.

Loading data using insert

>insert into table table1 select col1, col2, col3 from emp_tab;
#Again into will append the rows and overwrite will remove existing data before loading.

External and Internal(or Managed) –

So, by default when you create a table it is internal or managed. What internal or managed is that you can drive and update the table and data residing in the hdfs. Whereas if you make that external, you lose the capability to make changes to the data residing in the hdfs. If you drop the external table, it will only drop the schema from meta store and will not impact the data residing in the hdfs. Whereas if the table was internal or managed, dropping would remove the schema and the data as well. So, when you want the data to be safe and do not want to provide a capability for people to drop the data, you can make it an external table.

–to create an external table, just mention external
>create external table if not exists table_name(col1 string, col2 array, col3 int, col4 string) row format delimited fields terminated by ‘,’ collection items terminated by ‘:’ lines terminated by ‘\n’ stored as textfile;

sorting

ORDER BY – This is the kind of sorting where the data is passed through only one reducer. So, you are provided a fully sorted data set. But if you are in strict mode, then it is mandatory for you to limit the query. This is because when only one reducer will take a lot of time to complete this query.

SORT BY – This is kind of sorting where the data is passed through multiple reducers. What this will do is provide you with a data set which will not be fully sorted. Each reducer will do its sorting and then the results will be merged.

DISTRIBUTE BY – This is a type of distribution where a particular set of values go to one reducer and another set to other reducer. But, distribute by does not do any kind of sorting. It just creates two non overlapping data sets.

To sort with multiple reducers we will have to use the Distribute by and sort by together. This will also not provide a full sort, but can be used as per requirement.

>Select col2 from table1 distribute by col2 sort by col2;

CLUSTER BY – This is a combination of cluster by sort by.

conditional statements

IF – select if(col1=’mgr’,col1,col2) => if(condition, when condition is true, when condition is false)

CASE – CASE when col1 =’mgr’ then ‘Manager’ when col1 = ’emp’ then ‘Employee’ else ‘NA’ END

ISNULL- select isnull(col1) from table1; # return true for each row when value is null, else false.

ISNOTNULL – select isnotnull(col1) from table1;

COALESCE – select coalesce(col1,col2,col3) from table1; # this selects the first non null value;

NVL – select nvl(col1,col2) from table1; # selects col2 if col1 is null

Bucketing and Partitioning –

The data in hdfs is on huge scale, so to optimize our querying we will be bucketing and partitioning. Let us start with partitions.

Partitioning –

Partitioning is a technique where we will create folder partitions for each logical data partition. For example we have huge amount of data and we have data as per years, so it would be better that we partition it as per years so that when we want to query a particular year, the system should not be looking at year column and utilizing its computing to just figure out that. If we have a partition, it will directly go for that year and will save a ton of time and computing. Now with partitioning, there are two types of partitioning, static and dynamic. Static is when we mention the partitions and dynamic is when we specify a column which needs to be partitioned and the partitions are created by new column values. Though, there is no difference in the type of how the table is created, but how the data is loaded in the partitions that define if it is a static or dynamic. Lets take a look at how the partitions are made.

–Creating a partitioned table
>create table dept_part(col11 string, col21 string) partitioned by(col31 string) row format delimited fields terminated by ‘,’ lines terminated by ‘\n’ stored as textfile;
–here we have created a table dept_part which we have partitioned on col31
>insert into table part_dept partition (col31=’HR’) select col1, col2, col3 from dept where col2=’HR’;
–here we have loaded data in a partition called ‘HR’ from a dept table where it has HR data for col2 ‘HR’
–Now if we wanted to have dynamic partitions such that it creates partitions with all unique names in col2 of dept table(source table), we would write the query in following way.
>Set hive.exec.dynamic.partition = true
>Insert into table part_dept partition (col31) select col1, col3, col2 from dept;
–so as we see above, col31 of the part_dept is the partition and all the unique values from col2 in dept table will be treated as a partition from base table. One thing you need to keep in mind is that when you insert data, the last column selected will be treated as the partition from base table.

Bucketing –

As Partitioning is on directory level, Bucketing or clustering is on file level. When we think that we have a column which some limited set of values, like countries, we can do partitions, but smaller countries may have less data and bigger countries would have high data. So even after having a partition does not optimize our query, that time we add bucketing where we have specific values which go in one bucket. The bucketing will depend on the hashing function of the bucketed column. For example we can bucket on states and then the values in one state will always go to one file. Also, as the metadata knows where a particular state resides, the joining on state becomes very fast.

Set hive.enforce.bucketing=true;
Set hive.exec.dynamic.partition.mode = nonstrict
>create table if not exists dept_buck (dept_no int, sal int, location string) partitioned by (deptname string) clustered by (location) into 4 buckets row format delimited fields terminated by ‘,’ lines terminated by ‘\n’ stored as textfile;
–As you see above we have a partition and in addition to that we have bucketed into 4 buckets for location, so you will see that all those locations will be divided into 4 buckets and you may not be able to find one location from one file into the other file. Those would be exclusive.

Different File types –

Apart from the textfile which we have used till now, we also have different types of file formats. The reason for different file types is the different compression and encryption these support.

I have found the Parquet file format to be the famous one and being used abundantly. So, I will mention about the Parquet, though you can research other file types.

  • TEXTFILE
  • SEQUENCEFILE
  • RCFILE – Record Columnar file – column oriented storage
  • ORCFILE – most optimized
  • Parquet – This is a columnar file format, which is leveraged to improve the storage and performance of the data and queries. So, as we pay for storage and processing, our plan should be to reduce the I/O and processing times so that we pay less to most of the cloud platforms(like  Athena). If we save in parquet it will reduce the size as compared to a csv file. But, will also take less time in processing as it traverses through the data in columnar fashion. As the trend moves to serverless, we will be paying as per the queries we run, like AWS Athena. So it is better to use Parquet. My 2 cents.
  • Avro

Querying in hive –

The querying in hive is similar to sql for basic cases, but when it comes to sorting, ordering, grouping, there are certain nuances to it which you should know. Also there are windows functions where you can create a window of data and apply queries to it.

Sorting in hive –

We use ORDER BY in sql to do the sorting and we can do the same in here, but as we know that in the background we have map reduce jobs running, when we run the order by in hive, it uses just one reducer to compare all values together. This will reduce the speed as we are letting only one reducer do that job. To make it possible for more reducers to do this, we use SORT BY. But, there is a caveat here. When we use SORT BY, the reducers will only be able to compare the values on that reducer and hence result in values which will not be sorted fully.

To counter this problem, we have DISTRIBUTE BY. This will distribute unique values on a reducer. A particular value will go to only one reducer. So still we would not get a fully sorted result, but there will be a good amount of sorting in the result data set. It is up to your needs what kind of sorting you want to do.

Then we have CLUSTER BY, which is like using DISTRIBUTE BY SORT BY together.