Hive - a petabyte scale data warehouse using Hadoop 6 minutes read | 1124 words by Ruben Berenguel
Hive is arguably old. It is also undoubtedly useful, even now: 10 years after it was introduced.
A decade ago things were very different. Petabyte scale was only the subject of few large companies, and you were restricted to store them in Hadoop and analyse them with MapReduce jobs, Apache Pig scripts and a couple more solutions (like HBase). Hive revolutionised data access by allowing SQL-ish access to big data, sometimes even interactively. Here I’ll present a summary of its initial paper:
I will use the present tense throughout this article for clarity, even though Hivetoday has somewhat different abilities. I will point out to some differences as well. For a more recent paper on Hive architecture and details, you can check this.
Hive is a data warehouse solution on top of Hadoop’s MapReduce execution system and HDFS storage. It allows querying data with HiveQL, a subset of SQL with custom extensions. Furthermore, accepts custom MapReduce scripts in queries.
The data model
The Hive data model should be already known by anyone reading this:
Databases, which contain,
Tables, which internally can be
Partitioned, by distributing data in sub-folders in HDFS, and additionally
Bucketed, by hashing specific columns.
Columns can be of primitive types (INT, STRING…), nestable types (arrays or maps of primitives or nestable types) and can also be custom types defined by the user. This data model is pretty much the one we have in Spark applications (and, just like with Spark, avoid bucketing, too complicated). Partitioning, in particular, is the same format you get when issuing a df.write.partitionBy(..cols).
The original implementation of Hive already has the concept of EXTERNAL TABLE, which has creeped into other data warehouses, even today.
Hive's architecture feels natural, or at least it is not a surprising architecture for a data warehouse (compare with Snowflake’s or even Delta Lake’s, and squint hard).
External interaction is covered by CLI access for interactive querying, a web UI and JDBC/ODBC connectors for database oriented clients. Internally, all these pieces connect with the driver via an Apache Thrift server. The driver is where queries are processed and prepared to be run as MapReduce tasks.
An internal metastore handles the catalog of metadata for tables stored in Hive. This metadata is written when creating tables, and is reused every time queries are executed or prepared.
Such a metastore can’t reside in HDFS: it needs random access and sequential scanning to work properly. Thus, it will be either stored in the local filesystem of the driver, or use a traditional relational database. Ideally, you’d add replication as well: this is the most important durable part.
Finally, the driver manages converting HiveQL queries into directed acyclic graphs (DAGs) of MapReduce jobs. In other words, this is where most of the magic happens.
HiveQL: The query language
As mentioned above, HiveQL is a subset of SQL with extensions (a sideset?). These extensions are roughly the following.
DDL (data definition language): extends SQL with information about SerDe (serialisation/deserialisation) information for columns, as well as information about partitioning, bucketing and storage format. It also extends creation with arbitrary key:value expressions, which I only ever remember using for column comments.
Multi-table inserts: This allows scanning once, writing many times. A kind of operation that doesn’t seem that common in normal databases, but is useful in big data cases. Or at least, it was deemed useful enough to be added as a special case by Hive.
UDFs/UDAFs (User Defined [Aggregate] Functions): These can be written in Java.
Embed MapReduce scripts: a basic row-by-row string interface can be passed to arbitrary MapReduce scripts. For people who have used Pig Latin, this is similar to the STREAM functionality. Horrible overhead though because you need to serialise rows to strings and back.
External tables: you can define and access external tables, which cannot be dropped. This allows accessing data that may be stored in HDFS, even in different formats, but querying via HiveQL.
The query compiler
This is the part I find more interesting, I like query analysers, optimisers and compilers.
As a starter, this converts strings into plans. Plans are not made equal, though:
In pure DDL string queries, plans are formed of only metadata operations.
When handling LOAD statements, plans are formed of only HDFS operations.
For inserts, or in general, queries, plans are formed by DAGs of MapReduce tasks. This is of course the fun part.
As expected, it all starts with a parser converting a query string to a parse tree.
The parse tree is picked by a semantic analyser. The SA converts the tree into an internal query representation, with entities resolved. That is, columns are confirmed to exist in tables, all typing checks, any automated casting is added. Also, * expressions are expanded.
A logical plan generator takes the internal query and creates a logical plan, which is just a tree of logical operations.
The logical tree is where we optimise queries. The original query optimiser is a multi-pass optimiser, with several typical optimisations and some MapReduce specific optimisations:
Combines multiple joins with the same key, which can be converted into 1 MapReduce task.
Adds repartition operators for JOIN, GROUP BY and custom MapReduce operators: these are markers for the boundaries between Map and Reduce when creating a physical plan.
Early pruning of columns, predicate pushdown and partition pruning.
Under certain kinds of sampling, bucket pruning.
The optimiser can take additional hints by the user, for instance, to trigger a map-side join (this is what a broadcast join looks like in MapReduce), or to repartition to prevent skewness.
Modern versions of Hive use Apache Calcite as a query optimiser. Calcite adds cost-based optimisation.
Finally, the physical plan generator converts the logical plan into a physical plan. A MapReduce task is created per marker operator in the logical plan, where a marker operator is either a repartition or a UNION ALL. Between markers, the queries will be assigned to mappers or reducers depending on what the operations are.
These MapReduce tasks form a DAG, and are sent in topological order (fancy way of saying: in the order required for everything to finish) to the execution engine. In the initial paper, the only execution engine is Hadoop, but later Tez was added as well, along with others. Nowadays, you would run Hive on Spark. Or maybe just use directly Spark.
Hive revolutionised big data, not only by allowing interactive querying but also by using a language that was in wide use, SQL. The alternative was Apache Pig and its query language, Pig Latin.