Apache Druid: analytical queries powered by magic
It has been a while since my previous data paper. This time I tackle a less known one.
I had actually read Druid’s documentation around a year and a half ago, and the paper around a year ago, but never got the time to write down what I found. All in all, Druid seems very well-designed for its use case: analytical queries on big datasets. But its architectural design seems to be designed for the not faint at heart. I have tried to summarise the Druid paper and I have combined it with what is currently available in the documentation for Druid. Have fun.
Apache Druid is a data store designed for real time exploratory analytics on large datasets. It is based on a columnar data storage, advanced indexing structures and an architecture with a shared-nothing approach that lets scale every piece horizontally.
The type of explorations where Druid excels are drill-downs and aggregates over dimension and metric columns.
The architecture of a Druid system is split in several different types of processes. The nomenclature has been updated and grown between the Druid paper and current Druid documentation, I will point the differences as we see each piece.
In the paper, real-time processes are used to ingest and query event streams. These processes index events, which are then immediately available for querying. They process short time ranges and periodically offload immutable batches to other processes. Events are processed are kept in-memory, in indexes that are updated incrementally. Regularly these indexes are flushed to disk in a columnar format. Regularly each of these processes will fetch all the persisted indexes they have locally, merge them and build a whole immutable block of data known as a segment. These segments are then persisted to colder storage (HDFS, S3, some form of block storage).
The time blocks are handled fluidly by adding (configurable) watermarks to wait for straggler events to close previous segments. In this way, a real-time process is serving queries in real-time for locally persisted and in-memory indexed events for a while.
These real time event processors are usually fetching data from a continuous data source, like Apache Kafka.
In the current architecture for Druid, these would be middle manager processes which assign tasks to peon processes running in separate JVMs, to maintain proper resource isolation. Current peon/middle manager processes can roll up (pre-aggregate or summarize) data at ingestion time to avoid storing unnecessary data, by setting a lower-level granularity. Note that this is enabled by default, and is based on dimension values and timestamp values of the events received.
Historical processes load and serve the immutable blocks that real-time processes have persisted. Most of the workload of querying Druid is done by these processes. These are purely shared-nothing, and thus can be infinitely scaled horizontally, or as much as your cold storage can feed them when they request data. When a historical process has a local copy of a segment from cold storage it notifies Zookeeper about which segments are available, to let queries about that segment be routed efficiently. Note that historical processes always see consistent data, because they deal with the immutable blocks from cold storage, nothing fresh or that can be updated. These are still called Historical in the current architecture.
Broker processes (usually in its own nodes) act as query routers to historical and real-time processes based on data available in Zookeeper. Even having lost Zookeeper queries work: brokers keep its most current state cached locally, so they can route queries if they have seen the segment before. Brokers keep queries cached (a simple LRU cache, least recently used), too. In current systems this is split into a broker and an optional router process type that load-balances between different types of processes and nodes.
Coordinator processes manage the distribution of load across historical processes, telling when to load or drop new segments. Druid internally uses MVCC (multi-version concurrency control, the same system AWS Redshift uses). Coordinator processes use Zookeeper for leadership election and maintain a state of the cluster view that they constantly update and persist to a MySQL (or similarly transactional and external) database. In the current naming these would be still coordinator processes, but there is additionally overlord processes, which we can all agree is a cooler name: they are the ones controlling the availability of middle managers and peon processes.
Load balancing of queries uses a form of CBO (cost-based optimisation) taking into account which real-time processes contain this information, which historical processes contain available segments, and which are undergoing queries at the moment. Coordinator processes may force segment replication across several historicals if a segment is very hot with queries.
Druid uses a custom columnar storage format. Data tables in Druid are called data sources, and are formed with timestamped events partitioned by segments. Each segment holds between 5 and 10 million rows. Segments are the smallest block Druid cares about as a whole. Timestamps are fundamental to prune queries and find the correct segments, but the granularity at which segments are partitioned will depend on the nature of the data, from seconds to weeks or months. All other column types are handled with its own data type, and compressed using the most appropriate encoding (dictionary, delta, algorithmically compressed…).
Druid handles string columns specially, by constructing lookup indices for the terms found in them. These are bitmap indices, which are small and also easy to compress. This speeds up queries on string columns by determining which rows are needed from a segment.
Finally, Druid’s query API is a custom query language encoded as JSON over POST requests. All the brokers, historical and real-time processes understand this API, which is known as native queries. Currently there is also Druid SQL, which uses Apache Calcite in the brokers to transform SQL into native druid queries. The responses are likewise JSON responses of blocks of aggregations of metrics per timestamp and dimension. Importantly, there were no join queries in the Druid paper: it is designed only for pure OLAP type of queries. There are some forms of join operations in current Druid, but they are not “full” joins like what we’d expect from a relational system.
All in all, Druid’s design looks sound. I have recently learned about the peformance and capabilities of ClickHouse, which are essentially similar, but without an architectural whitepaper (or one I could find). Not knowing what the design ideas were makes me nervous.