Hive

hive

Note: This post is specifically for my learning on the topic and to get warm-up at the time of interviews. Everything in the post I gathered from various books and websites, this is specifically for my understanding and to not for any business use.

Hadoop

Hadoop is an open-source framework to store and process Big Data in a distributed environment. It contains two modules, one is MapReduce and another is Hadoop Distributed File System (HDFS).

  • MapReduce: It is a parallel programming model for processing large amounts of structured, semi-structured, and unstructured data on large clusters of commodity hardware.
  • HDFS:Hadoop Distributed File System is a part of Hadoop framework, used to store and process the datasets. It provides a fault-tolerant file system to run on commodity hardware.

The Hadoop ecosystem contains different sub-projects (tools) such as Sqoop, Pig, and Hive that are used to help Hadoop modules.

  • Sqoop: It is used to import and export data to and from between HDFS and RDBMS.
  • Pig: It is a procedural language platform used to develop a script for MapReduce operations.
  • Hive: It is a platform used to develop SQL type scripts to do MapReduce operations.

Note: There are various ways to execute MapReduce operations:

  • The traditional approach using Java MapReduce program for structured, semi-structured, and unstructured data.
  • The scripting approach for MapReduce to process structured and semi structured data using Pig.
  • The Hive Query Language (HiveQL or HQL) for MapReduce to process structured data using Hive.

What is Hive

Hive is a data warehouse infrastructure tool to process structured data in Hadoop. It resides on top of Hadoop to summarize Big Data, and makes querying and analyzing easy.

Initially Hive was developed by Facebook, later the Apache Software Foundation took it up and developed it further as an open source under the name Apache Hive. It is used by different companies. For example, Amazon uses it in Amazon Elastic MapReduce.

Hive is not

  • A relational database
  • A design for OnLine Transaction Processing (OLTP)
  • A language for real-time queries and row-level updates

Features of Hive

  • It stores schema in a database and processed data into HDFS.
  • It is designed for OLAP.
  • It provides SQL type language for querying called HiveQL or HQL.
  • It is familiar, fast, scalable, and extensible.

Architecture of Hive

The following component diagram depicts the architecture of Hive:

hive_architecture

This component diagram contains different units. The following table describes each unit:

Unit Name Operation
User Interface Hive is a data warehouse infrastructure software that can create interaction between user and HDFS. The user interfaces that Hive supports are Hive Web UI, Hive command line, and Hive HD Insight (In Windows server).
Meta Store Hive chooses respective database servers to store the schema or Metadata of tables, databases, columns in a table, their data types, and HDFS mapping.
HiveQL Process Engine HiveQL is similar to SQL for querying on schema info on the Metastore. It is one of the replacements of traditional approach for MapReduce program. Instead of writing MapReduce program in Java, we can write a query for MapReduce job and process it.
Execution Engine The conjunction part of HiveQL process Engine and MapReduce is Hive Execution Engine. Execution engine processes the query and generates results as same as MapReduce results. It uses the flavor of MapReduce.
HDFS or HBASE Hadoop distributed file system or HBASE are the data storage techniques to store data into file system.

Working of Hive:

how_hive_works

 

The following table defines how Hive interacts with Hadoop framework:

Step No. Operation
1 Execute QueryThe Hive interface such as Command Line or Web UI sends query to Driver (any database driver such as JDBC, ODBC, etc.) to execute.
2 Get PlanThe driver takes the help of query compiler that parses the query to check the syntax and query plan or the requirement of query.
3 Get MetadataThe compiler sends metadata request to Metastore (any database).
4 Send MetadataMetastore sends metadata as a response to the compiler.
5 Send PlanThe compiler checks the requirement and resends the plan to the driver. Up to here, the parsing and compiling of a query is complete.
6 Execute PlanThe driver sends the execute plan to the execution engine.
7 Execute JobInternally, the process of execution job is a MapReduce job. The execution engine sends the job to JobTracker, which is in Name node and it assigns this job to TaskTracker, which is in Data node. Here, the query executes MapReduce job.
7.1 Metadata OpsMeanwhile in execution, the execution engine can execute metadata operations with Metastore.
8 Fetch ResultThe execution engine receives the results from Data nodes.
9 Send ResultsThe execution engine sends those resultant values to the driver.
10 Send ResultsThe driver sends the results to Hive Interfaces.

 

  1. Fundamentals of Hive?
  2. 1. Hive Thrift server is used to provide Thrift client for communication with JDBC, ODBC etc.
  3. Insert Overwrite is supported in Hive.
  4. Multitable insert is supported in hive.
  5. We can convert 1 datatype to another datatype using casting ex: CAST(S AS Int)
  6. While loading data to hive tables, Hive never validate the data based on schema we created, it simple moves the data to respected type tables(Managed or external).

 

  1. Hive Architecture?
  2. https://www.guru99.com/introduction-hive.html

 

  1. What is Thrift?
  2. Apache Thrift is a software framework for scalable cross-language services development, which combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Perl, C#, JavaScript, Node.js and other languages.

 

  1. Different Types of Hive environment modes.
  2. Emmbeded mode.

Local Mode-

Remote  Mode- Hive Drive, Metastore and RDBMS all runs in different nodes of the Cluster environment, which best practice for production.

 

  1. Compression between RDBMS and Hive

 

 

  1. Datatypes of Hive?
  2. Tiny Int, Small Int, Int, Big Int, Float, boolean, String,timestamp

Collection Datatypes:  array, Map, Struct

Struct name : name {fname String, last String}. To access >> name.fname

 

  1. Hive tables.
  2. Hive have 2 types of tables:
  3. Managed tables:
  • It is also know an internal table. When we create a table in Hive, it by default manages the data. This means that Hive moves the data into its warehouse directory.
    • Ex: CREATE TABLE managed_table (dummy STRING);
    • LOAD DATA INPATH ‘/user/tom/data.txt’ INTO table managed_table;
  • if we drop the table, Then this will delete the table metadata including its data. The data no longer exists anywhere. This is what it means for HIVE to manage the data.
    • Ex: DROP TABLE managed_table
  • Hive solely controls the Managed table security. Within Hive, security needs to be managed; probably at the schema level (depends on organization).
  1. External tables:
  • It tells Hive to refer to the data that is at an existing location outside the warehouse directory. So it doesn’t move data to its warehouse directory. It does not even check whether the external location exists at the time it is defined. This very useful feature because it means we create the data lazily after creating the table.
    • Ex: CREATE EXTERNAL TABLE external_table(dummy STRING) LOCATION ‘/user/tom/external_table’;
    • LOAD DATA INPATH ‘/user/tom/data.txt’ INTO TABLE external_table;
  • when we drop an external table, Hive will leave the data untouched and only delete the metadata.
  • These tables files are accessible to anyone who has access to HDFS file structure. So, it needs to manage security at the HDFS file/folder level.
  • When you want to create multiple schema tables with same data with different datatype for the columns then we can go with External table bcz this will not duplicate the data by creating multiple table and just create schema which is metadata managed by hive.

 

  1. Partitioning and bucketing.
  2. Bucketing:
  3. To Enhance or dividing the table datasets into more manageable parts, Apache Hive offers another technique. That technique is what we call Bucketing.
  4. when are partitioning our tables based geographic locations like country. Hence, some bigger countries will have large partitions (ex: 4-5 countries itself contributing 70-80% of total data). While small countries data will create small partitions (remaining all countries in the world may contribute to just 20-30 % of total data). Hence, at that time Partitioning will not be ideal.
  5. This is based on hashing function on the bucketed column. Along with mod (by the total number of buckets):
  6. Where the hash_function depends on the type of the bucketing column.
  7. However, the Records with the same bucketed column will always be stored in the same bucket.
  8. Moreover,  to divide the table into buckets we use CLUSTERED BY clause.
  9. Generally, in the table directory, each bucket is just a file, and Bucket numbering is 1-based.
  10. Along with Partitioning on Hive tables bucketing can be done and even without partitioning.
  11. Moreover, Bucketed tables will create almost equally distributed data file parts.
  12. Creating :

CREATE TABLE bucketed_user(firstname VARCHAR(64),lastname VARCHAR(64),address STRING,city VARCHAR(64),state VARCHAR(64),post STRING,phone1 VARCHAR(64),phone2 STRING,email STRING,web STRING) COMMENT ‘A bucketed sorted user table’PARTITIONED BY (country VARCHAR(64)) CLUSTERED BY (state) SORTED BY (city) INTO 32 BUCKETS STORED AS SEQUENCEFILE;

  1. Advantages of Bucketing in Hive:
  2. On comparing with non-bucketed tables, Bucketed tables offer the efficient sampling.
  3. Map-side joins will be faster on bucketed tables than non-bucketed tables, as the data files are equal sized parts.
  4. Bucketed tables offer faster query responses than non-bucketed tables as compared to  Similar to partitioning.
  5. This concept offers the flexibility to keep the records in each bucket to be sorted by one or more columns.
  6. Insert Data into bucketed table:

If you want to convert the non-bucketed table to bucketed one then we need to enable following property in hife-config.xml

hive.enforce.bucketing=true;

INSERT OVERWRITE TABLE bucketed_user PARTITION (country) SELECT firstname, lastname, address , city, state, post, phone1, phone2, email, web, country FROM temp_user;

  1. Some points are important to Note:
  2. However, in partitioning the property hive.enforce.bucketing = true is similar to hive.exec.dynamic.partition=true property. So, we can enable dynamic bucketing while loading data into hive table By setting this property.
  3. Moreover, it will automatically set the number of reduce tasks to be equal to the number of buckets mentioned in the table definition (for example 32 in our case). Further, it automatically selects the clustered by column from table definition.
  4. Also, we have to manually convey the same information to Hive that, number of reduce tasks to be run (for example in our case, by using set mapred.reduce.tasks=32) and CLUSTER BY (state) and SORT BY (city) clause in the above INSERT …Statement at the end since we do not set this property in Hive Session.

 

Partitioning:

  1. Partitioning is a way of dividing a table into related parts based on the values of particular columns like date, city, and department.
  2. Creating:

CREATE TABLE table_name (column1 data_type, column2 data_type) PARTITIONED BY (partition1 data_type, partition2 data_type,….);

  1. Table Samples.
  2. 1. Table sample helps to fetch sample data from hugh high volume of data, which reduces the performance impact.
  3. Best use is to use the table sample with bucketing so that it will get the sample data from the specific but instead of scanning the entire table, which can impact the performance.
  4. Column name should be provided while doing sampling.
  5. Syntax: Select * from tableName tablesamples(bucket 3 out of 32) on rand()

from 3 bucket the data will be picketed.

If suppose we give tablesamples(bucket 3 out of 16) on id.

Then it does 32/16=2, so hive will pick 3 bucket data from 1 to 16 bucket and 19 bucket from 17 to 32 buckets, and return the sample data.

If suppose we give tablesamples(bucket 3 out of 62) on id

Then it does 32/64= 1/2, so hive will pick 3 bucket from 1 to 32 bucket and return first half rows of the 3 bucket as sample data.

  1. rand() will return entire row.

 

  1. Hive different file formats & Record format:
  2. File formats:
  3. Text File: Text file format is a default storage format. You can use the text format to interchange the data with other client application. The text file format is very common most of the applications. Data is stored in lines, with each line being a record. Each lines are terminated by a newline character (\n).

Create a TEXT file by add storage option as ‘STORED AS TEXTFILE’ at the end of a Hive CREATE TABLE command.

Ex: Create table textfile_table (column_specs) stored as textfile;

  1. Sequence File: Sequence files are Hadoop flat files which stores values in binary key-value pairs. The sequence files are in binary format and these files are able to split. The main advantages of using sequence file is to merge two or more files into one file.

Create a sequence file by add storage option as ‘STORED AS SEQUENCEFILE’ at the end of a Hive CREATE TABLE command.

Ex: Create table textfile_table (column_specs) stored as sequencefile;

  1. RC File: RCFile is row columnar file format. This is another form of Hive file format which offers high row level compression rates. If you have requirement to perform multiple rows at a time then you can use RCFile format.The RCFile are very much similar to the sequence file format. This file format also stores the data as key-value pairs.

Create RCFile by specifying ‘STORED AS RCFILE’ option at the end of a CREATE TABLE Command:

Ex: Create table textfile_table (column_specs) stored as rcfile;

  1. AVRO File: AVRO is open source project that provides data serialization and data exchange services for Hadoop. Avro is a row-based storage format for Hadoop.

Ex: Create table textfile_table (column_specs) stored as avro;

  1. ORC File: ORC file stands for Optimized Row Columnar file format. The ORC file format provides a highly efficient way to store data in Hive table. This file system was actually designed to overcome limitations of the other Hive file formats. The Use of ORC files improves performance when Hive is reading, writing, and processing data from large tables.

Ex: Create table textfile_table (column_specs) stored as orc;

  1. Parquet File: Parquet is a column-oriented binary file format. The parquet is highly efficient for the types of large-scale queries. Parquet is especially good for queries scanning particular columns within a particular table.

Ex: Create table textfile_table (column_specs) stored as parquet;

  1. Record format

 

  1. Sharde in hive.
  2. Sharde is library, which seats on Hadoop to store and read data from one file format to another file format.

 

  1. Job exectution flow
  2. The data flow in Hive behaves in the following pattern;
  3. Executing Query from the UI( User Interface)
  4. The driver is interacting with Compiler for getting the plan. (Here plan refers to query execution) process and its related metadata information gathering
  5. The compiler creates the plan for a job to be executed. Compiler communicating with Meta store for getting metadata request
  6. Meta store sends metadata information back to compiler
  7. Compiler communicating with Driver with the proposed plan to execute the query
  8. Driver Sending execution plans to Execution engine
  9. Execution Engine (EE) acts as a bridge between Hive and Hadoop to process the query. For DFS operations.
  10. EE should first contacts Name Node and then to Data nodes to get the values stored in tables.
  11. EE is going to fetch desired records from Data Nodes. The actual data of tables resides in data node only. While from Name Node it only fetches the metadata information for the query.
  12. It collects actual data from data nodes related to mentioned query
  13. Execution Engine (EE) communicates bi-directionally with Meta store present in Hive to perform DDL (Data Definition Language) operations. Here DDL operations like CREATE, DROP and ALTERING tables and databases are done. Meta store will store information about database name, table names and column names only. It will fetch data related to query mentioned.
  14. Execution Engine (EE) in turn communicates with Hadoop daemons such as Name node, Data nodes, and job tracker to execute the query on top of Hadoop file system
  15. Fetching results from driver
  16. Sending results to Execution engine. Once the results fetched from data nodes to the EE, it will send results back to driver and to UI ( front end)

 

  1. Map-Side Join and Bucket-Map Join and Sort Merge Bucket(SMB) Map Join (https://acadgild.com/blog/map-side-joins-in-hive)
  2. Map-Side Join:
  3. Map side join is a process where joins between two tables are performed in the Map phase without the involvement of Reduce phase.
  4. Map-side Joins allows a table to get loaded into memory ensuring a very fast join operation, performed entirely within a mapper and that too without having to use both map and reduce phases.
  5. By specifying the keyword, /*+ MAPJOIN(b) */ in the join statement.
  6. By setting the following property to true: hive.auto.convert.join=true
  7. For performing Map-side joins, there should be two files, one is of larger size and the other is of smaller size. You can set the small file size by using the following property:  hive.mapjoin.smalltable.filesize=(default it will be 25MB)
  8. SELECT /*+ MAPJOIN(dataset2) */ dataset1.first_name, dataset1.eid,dataset2.eid FROM dataset1 JOIN dataset2 ON dataset1.first_name = dataset2.first_name;

 

Bucket-Map Join:

  1. If tables being joined are bucketed on the join columns, and the number of buckets in one table is a multiple of the number of buckets in the other table, the buckets can be joined with each other.
  2. For suppose if one table has 2 buckets then the other table must have either 2 buckets or a multiple of 2 buckets (2, 4, 6, and so on). Further, since the preceding condition is satisfied then the joining can be done on the mapper side only. Else a normal inner join is performed.
  3. Ex:

CREATE TABLE IF NOT EXISTS dataset1_bucketed ( eid int,first_name String, last_name String, email String, gender String, ip_address String) clustered by(first_name) into 4 buckets row format delimited fields terminated BY ‘,’;

CREATE TABLE IF NOT EXISTS dataset2_bucketed (eid int,first_name String, last_name String) clustered by(first_name) into 8 buckets row format delimited fields terminated BY ‘,’ ;

insert into dataset1_bucketed select * from dataset1;

insert into dataset2_bucketed select * from dataset2;

 

  1. For performing Bucket-Map join, we need to set this property in the Hive shell.

set hive.optimize.bucketmapjoin = true

 

SELECT /*+ MAPJOIN(dataset2_bucketed) */ dataset1_bucketed.first_name,dataset1_bucketed.eid, dataset2_bucketed.eid FROM dataset1_bucketed JOIN dataset2_bucketed ON dataset1_bucketed.first_name = dataset2_bucketed.first_name ;

 

Sort Merge Bucket(SMB) Map Join:

  1. If the tables being joined are sorted and bucketized on the join columns and have the same number of buckets, a sort-merge join can be performed. The corresponding buckets are joined with each other at the mapper.
  2. For performing the SMB-Map join, we need to set the following properties:

Set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

  1. To perform this join, we need to have the data in the bucketed tables sorted by the join column. Now, we will re-insert the data into the bucketed tables by using sorting the records.

insert overwrite table dataset1_bucketed select * from dataset1 sort by first_name;

insert overwrite table dataset2_bucketed select * from dataset2 sort by first_name;

  1. To perform SMB-Map join, we need to have the same number of buckets in both the tables with the bucketed column sorted.

Now, we will create another table for dataset2 having 4 buckets and will insert the data that is sorted by first_name.

CREATE TABLE IF NOT EXISTS dataset2_bucketed1 (eid int,first_name String, last_name String) clustered by(first_name) into 4 buckets row format delimited fields terminated BY ‘,’ ;

insert overwrite table dataset2_bucketed1 select * from dataset2 sort by first_name;

SELECT /*+ MAPJOIN(dataset2_sbucketed1) */dataset1_bucketed.first_name, dataset1_bucketed.eid, dataset2_bucketed1.eid FROM dataset1_bucketed JOIN dataset2_bucketed1 ON dataset1_bucketed.first_name = dataset2_bucketed1.first_name ;

 

  1. Hive Syntax:  A. Create DB:

CREATE DATABASE|SCHEMA [IF NOT EXISTS] <database name>

EX: CREATE DATABASE [IF NOT EXISTS] userdb;

  1. Drop DB:

DROP DATABASE IF EXISTS userdb;

  1. Create table:

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name

(col_name data_type [COMMENT ‘col_comment’], …)

[PARTITIONED BY (col_name data_type [COMMENT ‘col_comment’], …)]

[COMMENT ‘table_comment’]

[WITH SERDEPROPERTIES (‘key1’=’value1’, ‘key2’=’value2’, …)]

[

[ROW FORMAT row_format] [STORED AS file_format]

]

[LOCATION ‘hdfs_path’]

[TBLPROPERTIES (‘key1’=’value1’, ‘key2’=’value2’, …)]

[CACHED IN ‘pool_name’ [WITH REPLICATION = integer] | UNCACHED]

16. Difference between SORT BY vs ORDER BY vs DISTRIBUTE BY vs CLUSTER BY?
A. 1. SORT BY: Hive uses the columns in SORT BY to sort the rows before feeding the rows to a reducer. The sort order will be dependent on the column types. If the column is of numeric type, then the sort order is also in numeric order. If the column is of string type, then the sort order will be lexicographical order.
2. ORDER BY: ORDER BY guarantees total ordering of data, but for that it has to be passed on to a single reducer, which is normally performance impact and therefore in strict mode, hive makes it compulsory to use LIMIT with ORDER BY so that reducer doesn’t get overburdened.
3. DISTRIBUTE BY: It ensures each of N reducers gets non-overlapping ranges of column, but doesn’t sort the output of each reducer. You end up with N or more unsorted files with non-overlapping ranges.
4. CLUSTER BY: Cluster By is a short-cut for both Distribute By and Sort By. ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers.

Design a site like this with WordPress.com
Get started