Stages in Hive

A Hive job consists of one or more stages , with dependencies between different stages. As you might expect, more complex queries will usually involve more stages and more stages usually requires more processing time to complete.
A stage could be a MapReduce job, a sampling stage, a merge stage, a limit stage, or a stage for some other task Hive needs to do. By default, Hive executes these stages one at a time, although later we’ll discuss parallel execution in Parallel Execution.

Hive converts a query into one or more stages.
Stages could be a MapReduce stage, a sampling stage, a merge stage, a limit stage, or other possible tasks Hive needs to do. By default, Hive executes these stages one at a time. However, a particular job may consist of some stages that are not dependent on each other and could be
executed in parallel, possibly allowing the overall job to complete more quickly.

However, if more stages are run simultaneously, the job may complete much faster.
Setting hive.exec.parallel to true enables parallel execution. If a job is running more stages in parallel, it will increase its cluster utilization:

<property>
<name>hive.exec.parallel</name>
<value>true</value>
<description>Whether to execute jobs in parallel</description>
</property>
Hive converts a query into one or more stages, such as a map reduce stage or a move task stage . If a stage fails, Hive cleans up the process and reports the errors. If a stage succeeds, Hive executes subsequent stages until the entire job is done. Also, multiple Hive statements can be placed inside an HQL file and Hive will execute each query in sequence until the file is completely processed.

The STAGE PLAN section is verbose and complex. Stage-1 is the bulk of the processing for this job and happens via a MapReduce job. A TableScan takes the input of the table and produces a single output column number. The Group ByOperator applies the sum(number) and produces an output column _col0 (a synthesized name for an anonymous result). All this is happening on the map side of the job, under the Map Operator Tree:

STAGE
PLANS
:
Stage
:
Stage

1
Map
Reduce
Alias
->
Map
Operator
Tree
:
onecol
TableScan
alias
:
onecol
Select
Operator
expressions
:
expr
:
number
type
:
int
outputColumnNames
:
number
Group
By
Operator
aggregations
:
expr
:
sum
(
number
)
bucketGroup
:
false
mode
:
hash
outputColumnNames
:
_col0
Reduce
Output
Operator
sort
order
:
tag
:

1
value
expressions
:
expr
:
_col0
type
:
bigint

On the reduce side, under the Reduce Operator Tree, we see the same Group by Operator but this time it is applying sum on _col0. Finally, in the reducer we see the File Output Operator, which shows that the output will be text, based on the string output format:

HiveIgnoreKeyTextOutputFormat:Reduce
Operator
Tree
:
Group
By
Operator
aggregations
:
expr
:
sum
(
VALUE
.
_col0
)
bucketGroup
:
false
mode
:
mergepartial
outputColumnNames
:
_col0
Select
Operator
expressions
:
expr
:
_col0
type
:
bigint
outputColumnNames
:
_col0
File
Output
Operator
compressed
:
false
GlobalTableId
:
0
table
:
input
format
:
org
.
apache
.
hadoop
.
mapred
.
TextInputFormat
output
format
:
org
.
apache
.
hadoop
.
hive
.
ql
.
io
.
HiveIgnoreKeyTextOutputFormat
Because this job has no LIMIT clause, Stage-0 is a no-op stage:
Stage
:
Stage

0
Fetch
Operator
limit
:

1

Understanding the intricate details of how Hive parses and plans every query is not useful all of the time. However, it is a nice to have for analyzing complex or poorly performing queries, especially as we try various tuning steps. We can observe what effect these changes have at the “logical” level, in tandem with performance measurements.
When you type a query through the CLI interface, this HiveQL statement will be handled by the Driver component. The Driver connects a bunch of modules that transform the statement into MapReduce jobs to be run in Hadoop. It is importante to note that the query is not transformed in Java code in this process. Its goes direclty to MapReduce jobs. The modules involved in this process are: Parser, Semantic Analyzes, Logical Plan generator, Optimizer, Physical Plan Generator and Executor.

Prior Support for MAPJOIN

Hive supports MAPJOINs, which are well suited for this scenario — at least for dimensions small enough to fit in memory. A MAPJOIN can be invoked either through an optimizer hint:

select /*+ MAPJOIN(time_dim) */ count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
or via auto join conversion:

set hive.auto.convert.join=true;
select count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
MAPJOINs are processed by loading the smaller table into an in-memory hash map and matching keys with the larger table as they are streamed through.

Local work:
read records via standard table scan (includes filters and projections) from source on local machine
build hashtable in memory
write hashtable to local disk
upload hashtable to dfs
add hashtable to distributed cache
Map task
read hashtable from local disk (distributed cache) into memory
match records? keys against hashtable
combine matches and write to output
No reduce task
Limitations of Current Implementation

The current MAPJOIN implementation has the following limitations:

The mapjoin operator can only handle one key at a time; that is, it can perform a multi-table join, but only if all the tables are joined on the same key. (Typical star schema joins do not fall into this category.)
Hints are cumbersome for users to apply correctly and auto conversion doesn’t have enough logic to consistently predict if a MAPJOIN will fit into memory or not.
A chain of MAPJOINs is not coalesced into a single map-only job, unless the query is written as a cascading sequence of mapjoin(table, subquery(mapjoin(table, subquery….). Auto conversion will never produce a single map-only job.
The hashtable for the mapjoin operator has to be generated for each run of the query, which involves downloading all the data to the Hive client machine as well as uploading the generated hashtable files.

Advertisements

About sangroyaamit

I am a PhD student in computer science at INRIA Grenoble.
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s