2013-12-04 16:56:16 -05:00
\documentclass { acm_ proc_ article-sp}
2013-04-15 18:42:10 -04:00
\usepackage { graphicx}
2013-12-04 16:56:16 -05:00
\usepackage { balance}
2013-04-15 18:42:10 -04:00
\usepackage { fontspec}
\setmainfont [Ligatures={TeX}] { Times}
\usepackage { hyperref}
\graphicspath { { figures/} }
\hyphenation { metamarkets nelson}
\begin { document}
% ****************** TITLE ****************************************
\title { Druid: A Real-time Analytical Data Store}
% ****************** AUTHORS **************************************
2013-12-04 16:56:16 -05:00
\numberofauthors { 6}
2013-04-15 18:42:10 -04:00
\author {
2014-03-07 20:03:22 -05:00
\alignauthor Fangjin Yang, Eric Tschetter, Xavier Léauté, Nelson Ray, Gian Merlino, Deep Ganguli\\
\email { \{ fangjin, cheddar, xavier, nelson, gian, deep\} @metamarkets.com}
2013-04-15 18:42:10 -04:00
}
\date { 21 March 2013}
\maketitle
2013-12-04 16:56:16 -05:00
\begin { abstract}
Druid is an open
source\footnote { \href { https://github.com/metamx/druid} { https://github.com/metamx/druid} }
data store designed for real-time exploratory analytics on large data sets.
The system combines a column-oriented storage layout, a distributed,
shared-nothing architecture, and an advanced indexing structure to allow for
the arbitrary exploration of billion-row tables with sub-second latencies. In
this paper, we describe Druid's architecture, and detail how it supports fast
aggregations, flexible filters, and low latency data ingestion.
2013-04-15 18:42:10 -04:00
\end { abstract}
2013-12-04 16:56:16 -05:00
\section { Introduction}
In recent years, the proliferation of internet technology has
created a surge in machine-generated events. Individually, these
events contain minimal useful information and are of low value. Given the
time and resources required to extract meaning from large collections of
events, many companies were willing to discard this data instead. Although
2014-03-09 18:31:59 -04:00
infrastructure has been built to handle event based data (e.g. IBM's
2013-12-04 16:56:16 -05:00
Netezza\cite { singh2011introduction} , HP's Vertica\cite { bear2012vertica} , and EMC's
Greenplum\cite { miner2012unified} ), they are largely sold at high price points
2014-03-09 18:31:59 -04:00
and are only targeted towards those companies who can afford the offering.
2013-12-04 16:56:16 -05:00
A few years ago, Google introduced MapReduce \cite { dean2008mapreduce} as their
mechanism of leveraging commodity hardware to index the internet and analyze
logs. The Hadoop \cite { shvachko2010hadoop} project soon followed and was
largely patterned after the insights that came out of the original MapReduce
paper. Hadoop is currently deployed in many organizations to store and analyze
large amounts of log data. Hadoop has contributed much to helping companies
convert their low-value event streams into high-value aggregates for a variety
of applications such as business intelligence and A-B testing.
As with a lot of great systems, Hadoop has opened our eyes to a new space of
problems. Specifically, Hadoop excels at storing and providing access to large
amounts of data, however, it does not make any performance guarantees around
how quickly that data can be accessed. Furthermore, although Hadoop is a
highly available system, performance degrades under heavy concurrent load.
Lastly, while Hadoop works well for storing data, it is not optimized for
ingesting data and making that data immediately readable.
Early on in the development of the Metamarkets product, we ran into each of
these issues and came to the realization that Hadoop is a great back-office,
batch processing, and data warehousing system. However, as a company that has
product-level guarantees around query performance and data availability in a
highly concurrent environment (1000+ users), Hadoop wasn't going to meet our
needs. We explored different solutions in the space, and after
trying both Relational Database Management Systems and NoSQL architectures, we
came to the conclusion that there was nothing in the open source world that
could be fully leveraged for our requirements.
We ended up creating Druid, an open-source, distributed, column-oriented,
realtime analytical data store. In many ways, Druid shares similarities with
2014-02-19 21:41:37 -05:00
other OLAP systems \cite { oehler2012ibm, schrader2009oracle, lachev2005applied} ,
interactive query systems \cite { melnik2010dremel} , main-memory databases
\cite { farber2012sap} , and widely-known distributed data stores
\cite { chang2008bigtable, decandia2007dynamo, lakshman2010cassandra} . The
distribution and query model also borrow ideas from current generation search
infrastructure \cite { linkedin2013senseidb, apache2013solr,
banon2013elasticsearch} .
2013-12-04 16:56:16 -05:00
This paper describes the architecture of Druid, explores the various design
decisions made in creating an always-on production system that powers a hosted
service, and attempts to help inform anyone who faces a similar problem about a
2013-12-08 21:01:01 -05:00
potential method of solving it. Druid is deployed in production at several
technology
companies\footnote { \href { http://druid.io/druid.html} { http://druid.io/druid.html} } .
2013-12-04 16:56:16 -05:00
The structure of the paper is as follows: we first describe the problem in
Section \ref { sec:problem-definition} . Next, we detail system architecture from
the point of view of how data flows through the system in Section
\ref { sec:architecture} . We then discuss how and why data gets converted into a
binary format in Section \ref { sec:storage-format} . We briefly describe the
2014-02-23 14:36:39 -05:00
query API in Section \ref { sec:query-api} and present our experimental results
2014-03-09 18:31:59 -04:00
in Section \ref { sec:benchmarks} . Lastly, we leave off with what we've learned from
2014-02-23 14:36:39 -05:00
running Druid in production in Section \ref { sec:production} , related work
in Section \ref { sec:related} , and conclusions in Section \ref { sec:conclusions} .
2013-12-04 16:56:16 -05:00
\section { Problem Definition}
\label { sec:problem-definition}
Druid was originally designed to solve problems around ingesting and exploring
2014-03-09 18:31:59 -04:00
large quantities of transactional events (log data). This form of timeseries
data is commonly found in OLAP workflows and the nature of the data tends to be
very append heavy. For example, consider the data shown in
2013-12-04 16:56:16 -05:00
Table~\ref { tab:sample_ data} . Table~\ref { tab:sample_ data} contains data for
2013-12-08 21:01:01 -05:00
edits that have occurred on Wikipedia. Each time a user edits a page in
2013-12-04 16:56:16 -05:00
Wikipedia, an event is generated that contains metadata about the edit. This
metadata is comprised of 3 distinct components. First, there is a timestamp
column indicating when the edit was made. Next, there are a set dimension
columns indicating various attributes about the edit such as the page that was
edited, the user who made the edit, and the location of the user. Finally,
2014-03-09 18:31:59 -04:00
there are a set of metric columns that contain values (usually numeric) that
can be aggregated, such as the number of characters added or removed in an
edit.
2013-04-15 18:42:10 -04:00
\begin { table*}
\centering
2013-12-04 16:56:16 -05:00
\caption { Sample Druid data for edits that have occurred on Wikipedia.}
2013-04-15 18:42:10 -04:00
\label { tab:sample_ data}
\begin { tabular} { | l | l | l | l | l | l | l | l |}
\hline
2013-12-04 16:56:16 -05:00
\textbf { Timestamp} & \textbf { Page} & \textbf { Username} & \textbf { Gender} & \textbf { City} & \textbf { Characters Added} & \textbf { Characters Removed} \\ \hline
2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline
2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline
2011-01-01T02:00:00Z & Ke\$ ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
2011-01-01T02:00:00Z & Ke\$ ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
2013-04-15 18:42:10 -04:00
\end { tabular}
\end { table*}
2013-12-04 16:56:16 -05:00
Our goal is to rapidly compute drill-downs and aggregates over this data. We
want to answer questions like “How many edits were made on the page Justin
Bieber from males in San Francisco?” and “What is the average number of
characters that were added by people from Calgary over the span of a month?”. We also
want queries over any arbitrary combination of dimensions to return with
sub-second latencies.
2013-12-08 21:01:01 -05:00
The need for Druid was facilitated by the fact that existing open source
2014-02-23 14:36:39 -05:00
Relational Database Management Systems (RDBMS) and NoSQL key/value stores were
unable to provide a low latency data ingestion and query platform for
interactive applications \cite { tschetter2011druid} . In the early days of
Metamarkets, we were focused on building a hosted dashboard that would allow
users to arbitrary explore and visualize event streams. The data store
powering the dashboard needed to return queries fast enough that the data
visualizations built on top of it could provide users with an interactive
experience.
2013-12-04 16:56:16 -05:00
In addition to the query latency needs, the system had to be multi-tenant and
2013-12-08 21:01:01 -05:00
highly available. The Metamarkets product is used in a highly concurrent
environment. Downtime is costly and many businesses cannot afford to wait if a
system is unavailable in the face of software upgrades or network failure.
2013-12-09 20:19:17 -05:00
Downtime for startups, who often lack proper internal operations management, can
determine business success or failure.
2013-12-04 16:56:16 -05:00
2013-12-08 21:01:01 -05:00
Finally, another key problem that Metamarkets faced in its early days was to
allow users and alerting systems to be able to make business decisions in
"real-time". The time from when an event is created to when that
2013-12-06 18:59:24 -05:00
event is queryable determines how fast users and systems are able to react to
2013-12-08 21:01:01 -05:00
potentially catastrophic occurrences in their systems. Popular open source data
warehousing systems such as Hadoop were unable to provide the sub-second data ingestion
2013-12-06 18:59:24 -05:00
latencies we required.
2013-12-04 16:56:16 -05:00
The problems of data exploration, ingestion, and availability span multiple
industries. Since Druid was open sourced in October 2012, it been deployed as a
2013-12-06 18:59:24 -05:00
video, network monitoring, operations monitoring, and online advertising
2013-12-08 21:01:01 -05:00
analytics platform in multiple companies.
2013-12-04 16:56:16 -05:00
\section { Architecture}
\label { sec:architecture}
A Druid cluster consists of different types of nodes and each node type is
2013-12-08 21:01:01 -05:00
designed to perform a specific set of things. We believe this design separates
2013-12-09 20:19:17 -05:00
concerns and simplifies the complexity of the system. The different node types
operate fairly independent of each other and there is minimal interaction
between them. Hence, intra-cluster communication failures have minimal impact
on data availability. To solve complex data analysis problems, the different
node types come together to form a fully working system. The name Druid comes
from the Druid class in many role-playing games: it is a shape-shifter, capable
of taking on many different forms to fulfill various different roles in a
group. The composition of and flow of data in a Druid cluster are shown in
Figure~\ref { fig:cluster} .
2013-04-15 18:42:10 -04:00
2013-12-04 16:56:16 -05:00
\begin { figure*}
\centering
\includegraphics [width = 4.5in] { cluster}
\caption { An overview of a Druid cluster and the flow of data through the cluster.}
\label { fig:cluster}
\end { figure*}
2013-04-15 18:42:10 -04:00
2014-02-23 14:36:39 -05:00
\newpage
2013-12-04 16:56:16 -05:00
\subsection { Real-time Nodes}
\label { sec:realtime}
2013-12-06 18:59:24 -05:00
Real-time nodes encapsulate the functionality to ingest and query event
streams. Events indexed via these nodes are immediately available for querying.
The nodes are only concerned with events for some small time range and
periodically hand off immutable batches of events they've collected over this
small time range to other nodes in the Druid cluster that are specialized in
dealing with batches of immutable events. Real-time nodes leverage Zookeeper
\cite { hunt2010zookeeper} for coordination with the rest of the Druid cluster.
The nodes announce their online state and the data they are serving in
Zookeeper.
2013-12-04 16:56:16 -05:00
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
indexes are also directly queryable. Druid virtually behaves as a row store
2014-02-19 21:41:37 -05:00
for queries on events that exist in this JVM heap-based buffer. To avoid heap
overflow problems, real-time nodes persist their in-memory indexes to disk
either periodically or after some maximum row limit is reached. This persist
process converts data stored in the in-memory buffer to a column oriented
storage format described in Section \ref { sec:storage-format} . Each persisted
index is immutable and real-time nodes load persisted indexes into off-heap
memory such that they can still be queried. This process is described in detail
in \cite { o1996log} and is illustrated in Figure~\ref { fig:realtime_ flow} .
2013-12-04 16:56:16 -05:00
\begin { figure}
\centering
\includegraphics [width = 2.8in] { realtime_ flow}
2013-12-09 20:19:17 -05:00
\caption { Real-time nodes first buffer events in memory. On a periodic basis,
the in-memory index is persisted to disk. On another periodic basis, all
persisted indexes are merged together and handed off. Queries for data will hit the
in-memory index and the persisted indexes.}
2013-12-04 16:56:16 -05:00
\label { fig:realtime_ flow}
\end { figure}
2013-04-15 18:42:10 -04:00
2013-12-08 21:01:01 -05:00
On a periodic basis, each real-time node will schedule a background task that
searches for all locally persisted indexes. The task merges these indexes
2013-12-06 18:59:24 -05:00
together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this
2013-12-08 21:01:01 -05:00
block of data as a "segment". During the handoff stage, a real-time node
2013-12-06 18:59:24 -05:00
uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite { decandia2007dynamo} or HDFS
\cite { shvachko2010hadoop} , which Druid refers to as "deep storage". The ingest,
2013-12-08 21:01:01 -05:00
persist, merge, and handoff steps are fluid; there is no data loss during any
of the processes.
To better understand the flow of data through a real-time node, consider the
following example. First, we start a real-time node at 13:37. The node will
only accept events for the current hour or the next hour. When the node begins
ingesting events, it will announce that it is serving a segment of data for a
time window from 13:00 to 14:00. Every 10 minutes (the persist period is
2013-12-04 16:56:16 -05:00
configurable), the node will flush and persist its in-memory buffer to disk.
Near the end of the hour, the node will likely see events with timestamps from
2013-12-08 21:01:01 -05:00
14:00 to 15:00. When this occurs, the node prepares to serve data for the next
hour and creates a new in-memory index. The node then announces that it is also
2013-12-04 16:56:16 -05:00
serving a segment for data from 14:00 to 15:00. The node does not immediately
merge the indexes it persisted from 13:00 to 14:00, instead it waits for a
configurable window period for straggling events from 13:00 to 14:00 to come
in. Having a window period minimizes the risk of data loss from delays in event
delivery. At the end of the window period, the real-time node merges all
persisted indexes from 13:00 to 14:00 into a single immutable segment and hands
the segment off. Once this segment is loaded and queryable somewhere else in
the Druid cluster, the real-time node flushes all information about the data it
2013-12-08 21:01:01 -05:00
collected for 13:00 to 14:00 and unannounces it is serving this data. This
2013-12-04 16:56:16 -05:00
process is shown in Figure~\ref { fig:realtime_ timeline} .
2013-04-15 18:42:10 -04:00
\begin { figure*}
\centering
2013-12-04 16:56:16 -05:00
\includegraphics [width = 4.5in] { realtime_ timeline}
2013-12-06 18:59:24 -05:00
\caption { The node starts, ingests data, persists, and periodically hands data
2013-12-04 16:56:16 -05:00
off. This process repeats indefinitely. The time intervals between different
real-time node operations are configurable.}
\label { fig:realtime_ timeline}
2013-04-15 18:42:10 -04:00
\end { figure*}
2013-12-04 16:56:16 -05:00
\subsubsection { Availability and Scalability}
Real-time nodes are a consumer of data and require a corresponding producer to
2013-12-08 21:01:01 -05:00
provide the data stream. Commonly, for data durability purposes, a message
2013-12-04 16:56:16 -05:00
bus such as Kafka \cite { kreps2011kafka} sits between the producer and the
real-time node as shown in Figure~\ref { fig:realtime_ pipeline} . Real-time nodes
ingest data by reading events from the message bus. The time from event
2013-12-08 21:01:01 -05:00
creation to event consumption is ordinarily on the order of hundreds of
2013-12-04 16:56:16 -05:00
milliseconds.
2013-04-15 18:42:10 -04:00
\begin { figure}
\centering
2013-12-04 16:56:16 -05:00
\includegraphics [width = 2.8in] { realtime_ pipeline}
\caption { Multiple real-time nodes can read from the same message bus. Each node maintains its own offset.}
\label { fig:realtime_ pipeline}
2013-04-15 18:42:10 -04:00
\end { figure}
2013-12-08 21:01:01 -05:00
The purpose of the message bus in Figure~\ref { fig:realtime_ pipeline} is
two-fold. First, the message bus acts as a buffer for incoming events. A
message bus such as Kafka maintains offsets indicating the position in an event
stream that a consumer (a real-time node) has read up to and consumers can
programmatically update these offsets. Typically, real-time nodes update this
offset each time they persist their in-memory buffers to disk. In a fail and
2013-12-04 16:56:16 -05:00
recover scenario, if a node has not lost disk, it can reload all persisted
indexes from disk and continue reading events from the last offset it
committed. Ingesting events from a recently committed offset greatly reduces a
node's recovery time. In practice, we see real-time nodes recover from such
failure scenarios in an order of seconds.
The second purpose of the message bus is to act as a single endpoint from which
multiple real-time nodes can read events. Multiple real-time nodes can ingest
the same set of events from the bus, thus creating a replication of events. In
a scenario where a node completely fails and does not recover, replicated
streams ensure that no data is lost. A single ingestion endpoint also allows
2013-12-08 21:01:01 -05:00
for data streams for be partitioned such that multiple real-time nodes each
2013-12-04 16:56:16 -05:00
ingest a portion of a stream. This allows additional real-time nodes to be
2013-12-08 21:01:01 -05:00
seamlessly added. In practice, this model has allowed one of the largest
production Druid clusters to be able to consume raw data at approximately 500
MB/s (150,000 events/s or 2 TB/hour).
2013-12-04 16:56:16 -05:00
\subsection { Historical Nodes}
Historical nodes encapsulate the functionality to load and serve the immutable
2013-12-06 18:59:24 -05:00
blocks of data (segments) created by real-time nodes. In many real-world
workflows, most of the data loaded in a Druid cluster is immutable and hence,
historical nodes are typically the main workers of a Druid cluster. Historical
nodes follow a shared-nothing architecture and there is no single point of
contention among the nodes. The nodes have no knowledge of one another and are
operationally simple; they only know how to load, drop, and serve immutable
segments.
2013-12-04 16:56:16 -05:00
Similar to real-time nodes, historical nodes announce their online state and
the data they are serving in Zookeeper. Instructions to load and drop segments
are sent over Zookeeper and contain information about where the segment is
2013-12-08 21:01:01 -05:00
located in deep storage and how to decompress and process the segment. Before
a historical node downloads a particular segment from deep storage, it first
checks a local cache that maintains information about what segments already
exist on the node. If information about a segment is not present in the cache,
the historical node will proceed to download the segment from deep storage.
This process is shown in Figure~\ref { fig:historical_ download} . Once processing
is complete, the segment is announced in Zookeeper. At this point, the segment
is queryable. The local cache also allows for historical nodes to be quickly
updated and restarted. On startup, the node examines its cache and immediately
serves whatever data it finds.
2013-04-15 18:42:10 -04:00
2013-12-04 16:56:16 -05:00
\begin { figure}
\centering
2013-12-09 21:01:11 -05:00
\includegraphics [width = 2.6in] { historical_ download}
2013-12-09 20:57:29 -05:00
\caption { Historical nodes download immutable segments from deep storage. Segments must be loaded in memory before they can be queried.}
2013-12-04 16:56:16 -05:00
\label { fig:historical_ download}
\end { figure}
Historical nodes can support read consistency because they only deal with
immutable data. Immutable data blocks also enable a simple parallelization
2013-12-09 20:19:17 -05:00
model: historical nodes can concurrently scan and aggregate immutable blocks
2013-12-04 16:56:16 -05:00
without blocking.
2013-04-15 18:42:10 -04:00
\subsubsection { Tiers}
\label { sec:tiers}
2013-12-04 16:56:16 -05:00
Historical nodes can be grouped in different tiers, where all nodes in a
2013-04-15 18:42:10 -04:00
given tier are identically configured. Different performance and
fault-tolerance parameters can be set for each tier. The purpose of
tiered nodes is to enable higher or lower priority segments to be
distributed according to their importance. For example, it is possible
2013-10-03 19:36:48 -04:00
to spin up a “hot” tier of historical nodes that have a high number of
2013-12-04 16:56:16 -05:00
cores and large memory capacity. The “hot” cluster can be configured to
download more frequently accessed data. A parallel “cold” cluster
2013-04-15 18:42:10 -04:00
can also be created with much less powerful backing hardware. The
“cold” cluster would only contain less frequently accessed segments.
2013-12-04 16:56:16 -05:00
\subsubsection { Availability}
Historical nodes depend on Zookeeper for segment load and unload instructions.
If Zookeeper becomes unavailable, historical nodes are no longer able to serve
new data and drop outdated data, however, because the queries are served over
HTTP, historical nodes are still be able to respond to query requests for
the data they are currently serving. This means that Zookeeper outages do not
2013-12-08 21:01:01 -05:00
impact current data availability on historical nodes.
2013-04-15 18:42:10 -04:00
\subsection { Broker Nodes}
2013-12-04 16:56:16 -05:00
Broker nodes act as query routers to historical and real-time nodes. Broker
nodes understand the metadata published in Zookeeper about what segments are
queryable and where those segments are located. Broker nodes route incoming queries
such that the queries hit the right historical or real-time nodes. Broker nodes
also merge partial results from historical and real-time nodes before returning
a final consolidated result to the caller.
2013-04-15 18:42:10 -04:00
\subsubsection { Caching}
\label { sec:caching}
2013-12-08 21:01:01 -05:00
Broker nodes contain a cache with a LRU \cite { o1993lru, kim2001lrfu}
2013-12-04 16:56:16 -05:00
invalidation strategy. The cache can use local heap memory or an external
2014-03-09 18:31:59 -04:00
distributed key/value store such as Memcached
2013-12-08 21:01:01 -05:00
\cite { fitzpatrick2004distributed} . Each time a broker node receives a query, it
first maps the query to a set of segments. Results for certain segments may
already exist in the cache and there is no need to recompute them. For any
results that do not exist in the cache, the broker node will forward the query
to the correct historical and real-time nodes. Once historical nodes return
their results, the broker will cache these results on a per segment basis for
future use. This process is illustrated in Figure~\ref { fig:caching} . Real-time
data is never cached and hence requests for real-time data will always be
forwarded to real-time nodes. Real-time data is perpetually changing and
caching the results would be unreliable.
2013-12-04 16:56:16 -05:00
\begin { figure*}
\centering
\includegraphics [width = 4.5in] { caching}
\caption { Broker nodes cache per segment results. Every Druid query is mapped to
2013-12-08 21:01:01 -05:00
a set of segments. Queries often combine cached segment results with those that
2013-12-09 20:19:17 -05:00
need to be computed on historical and real-time nodes.}
2013-12-04 16:56:16 -05:00
\label { fig:caching}
\end { figure*}
The cache also acts as an additional level of data durability. In the event
that all historical nodes fail, it is still possible to query results if those
results already exist in the cache.
\subsubsection { Availability}
In the event of a total Zookeeper outage, data is still queryable. If broker
2013-12-08 21:01:01 -05:00
nodes are unable to communicate to Zookeeper, they use their last known view of
the cluster and continue to forward queries to real-time and historical nodes.
Broker nodes make the assumption that the structure of the cluster is the same
as it was before the outage. In practice, this availability model has allowed
2013-12-09 20:19:17 -05:00
our Druid cluster to continue serving queries for a significant period of time while we
2013-12-08 21:01:01 -05:00
diagnosed Zookeeper outages.
2013-12-04 16:56:16 -05:00
\subsection { Coordinator Nodes}
2013-12-08 21:01:01 -05:00
Druid coordinator nodes are primarily in charge of data management and
2013-12-04 16:56:16 -05:00
distribution on historical nodes. The coordinator nodes tell historical nodes
2013-12-08 21:01:01 -05:00
to load new data, drop outdated data, replicate data, and move data to load
balance. Druid uses a multi-version concurrency control swapping protocol for
2013-12-04 16:56:16 -05:00
managing immutable segments in order to maintain stable views. If any
immutable segment contains data that is wholly obseleted by newer segments, the
outdated segment is dropped from the cluster. Coordinator nodes undergo a
leader-election process that determines a single node that runs the coordinator
functionality. The remaining coordinator nodes act as redundant backups.
A coordinator node runs periodically to determine the current state of the
cluster. It makes decisions by comparing the expected state of the cluster with
the actual state of the cluster at the time of the run. As with all Druid
2013-12-08 21:01:01 -05:00
nodes, coordinator nodes maintain a Zookeeper connection for current cluster
information. Coordinator nodes also maintain a connection to a MySQL
2013-12-04 16:56:16 -05:00
database that contains additional operational parameters and configurations.
One of the key pieces of information located in the MySQL database is a table
that contains a list of all segments that should be served by historical nodes.
This table can be updated by any service that creates segments, for example,
real-time nodes. The MySQL database also contains a rule table that governs how
segments are created, destroyed, and replicated in the cluster.
2013-04-15 18:42:10 -04:00
\subsubsection { Rules}
2013-12-04 16:56:16 -05:00
Rules govern how historical segments are loaded and dropped from the cluster.
Rules indicate how segments should be assigned to different historical node
tiers and how many replicates of a segment should exist in each tier. Rules may
also indicate when segments should be dropped entirely from the cluster. Rules
are usually set for a period of time. For example, a user may use rules to
load the most recent one month's worth of segments into a "hot" cluster, the
most recent one year's worth of segments into a "cold" cluster, and drop any
segments that are older.
The coordinator nodes load a set of rules from a rule table in the MySQL
database. Rules may be specific to a certain data source and/or a default set
2013-12-08 21:01:01 -05:00
of rules may be configured. The coordinator node will cycle through all available
2013-12-04 16:56:16 -05:00
segments and match each segment with the first rule that applies to it.
2013-04-15 18:42:10 -04:00
\subsubsection { Load Balancing}
2013-12-04 16:56:16 -05:00
In a typical production environment, queries often hit dozens or even hundreds
of segments. Since each historical node has limited resources, segments must be
distributed among the cluster to ensure that the cluster load is not too
imbalanced. Determining optimal load distribution requires some knowledge about
query patterns and speeds. Typically, queries cover recent segments spanning
contiguous time intervals for a single data source. On average, queries that
2013-04-15 18:42:10 -04:00
access smaller segments are faster.
2013-12-04 16:56:16 -05:00
These query patterns suggest replicating recent historical segments at a higher
rate, spreading out large segments that are close in time to different
historical nodes, and co-locating segments from different data sources. To
optimally distribute and balance segments among the cluster, we developed a
cost-based optimization procedure that takes into account the segment data
source, recency, and size. The exact details of the algorithm are beyond the
scope of this paper and may be discussed in future literature.
\subsubsection { Replication}
Coordinator nodes may tell different historical nodes to load copies of the
same segment. The number of replicates in each tier of the historical compute
cluster is fully configurable. Setups that require high levels of fault
2013-12-08 21:01:01 -05:00
tolerance can be configured to have a high number of replicas. Replicated
2013-12-04 16:56:16 -05:00
segments are treated the same as the originals and follow the same load
2013-12-08 21:01:01 -05:00
distribution algorithm. By replicating segments, single historical node
failures are transparent in the Druid cluster. We use this property for
software upgrades. We can seamlessly take a historical node offline, update it,
bring it back up, and repeat the process for every historical node in a
cluster. Over the last two years, we have never taken downtime in our Druid
cluster for software upgrades.
2013-12-04 16:56:16 -05:00
\subsubsection { Availability}
Druid coordinator nodes have two external dependencies: Zookeeper and MySQL.
Coordinator nodes rely on Zookeeper to determine what historical nodes already
exist in the cluster. If Zookeeper becomes unavailable, the coordinator will no
2013-12-08 21:01:01 -05:00
longer be able to send instructions to assign, balance, and drop segments.
However, these operations do not affect data availability at all.
2013-12-04 16:56:16 -05:00
The design principle for responding to MySQL and Zookeeper failures is the
same: if an external dependency responsible for coordination fails, the cluster
maintains the status quo. Druid uses MySQL to store operational management
information and segment metadata information about what segments should exist
in the cluster. If MySQL goes down, this information becomes unavailable to
2013-12-08 21:01:01 -05:00
coordinator nodes. However, this does not mean data itself is unavailable. If
2013-12-04 16:56:16 -05:00
coordinator nodes cannot communicate to MySQL, they will cease to assign new
2014-03-09 18:31:59 -04:00
segments and drop outdated ones. Broker, historical, and real-time nodes are still
2013-12-04 16:56:16 -05:00
queryable during MySQL outages.
\section { Storage Format}
\label { sec:storage-format}
Data tables in Druid (called \emph { data sources} ) are collections of
timestamped events and partitioned into a set of segments, where each segment
is typically 5--10 million rows. Formally, we define a segment as a collection
of rows of data that span some period in time. Segments represent the
fundamental storage unit in Druid and replication and distribution are done at
a segment level.
Druid always requires a timestamp column as a method of simplifying data
distribution policies, data retention policies, and first-level query pruning.
Druid partitions its data sources into well-defined time intervals, typically
an hour or a day, and may further partition on values from other columns to
achieve the desired segment size. For example, partitioning the data in
Table~\ref { tab:sample_ data} by hour results in two segments for 2011-01-01, and
partitioning the data by day results in a single segment. The time granularity
to partition segments is a function of data volume and time range. A data set
with timestamps spread over a year is better partitioned by day, and a data set
with timestamps spread over a day is better partitioned by hour.
Segments are uniquely identified by a data source identifer, the time interval
of the data, and a version string that increases whenever a new segment is
created. The version string indicates the freshness of segment data; segments
with later versions have newer views of data (over some time range) than
segments with older versions. This segment metadata is used by the system for
concurrency control; read operations always access data in a particular time
range from the segments with the latest version identifiers for that time
range.
2013-04-15 18:42:10 -04:00
2013-12-04 16:56:16 -05:00
Druid segments are stored in a column orientation. Given that Druid is best
used for aggregating event streams (all data going into Druid must have a
timestamp), the advantages storing aggregate information as columns rather than
rows are well documented \cite { abadi2008column} . Column storage allows for more
efficient CPU usage as only what is needed is actually loaded and scanned. In a
row oriented data store, all columns associated with a row must be scanned as
2013-12-08 21:01:01 -05:00
part of an aggregation. The additional scan time can introduce signficant performance
degradations \cite { abadi2008column} .
2013-12-04 16:56:16 -05:00
Druid has multiple column types to represent various data formats. Depending on
the column type, different compression methods are used to reduce the cost of
storing a column in memory and on disk. In the example given in
Table~\ref { tab:sample_ data} , the page, user, gender, and city columns only
2013-12-08 21:01:01 -05:00
contain strings. Storing strings directly is unnecessarily costly and string
columns can be dictionary encoded instead. Dictionary encoding is a common
method to compress data and has been used in other data stores such as
PowerDrill \cite { hall2012processing} . In the example in
2014-02-14 18:05:30 -05:00
Table~\ref { tab:sample_ data} , we can map each page to an unique integer
2013-12-08 21:01:01 -05:00
identifier.
2013-04-15 18:42:10 -04:00
\begin { verbatim}
2013-12-04 16:56:16 -05:00
Justin Bieber -> 0
Ke$ ha - > 1
2013-04-15 18:42:10 -04:00
\end { verbatim}
2013-12-04 16:56:16 -05:00
This mapping allows us to represent the page column as an integer
2013-04-15 18:42:10 -04:00
array where the array indices correspond to the rows of the original
2013-12-04 16:56:16 -05:00
data set. For the page column, we can represent the unique
pages as follows:
2013-04-15 18:42:10 -04:00
\begin { verbatim}
[0, 0, 1, 1]
\end { verbatim}
The resulting integer array lends itself very well to
compression methods. Generic compression algorithms on top of encodings are
extremely common in column-stores. Druid uses the LZF \cite { liblzf2013} compression
algorithm.
Similar compression methods can be applied to numeric
2013-12-04 16:56:16 -05:00
columns. For example, the characters added and characters removed columns in
2013-04-15 18:42:10 -04:00
Table~\ref { tab:sample_ data} can also be expressed as individual
arrays.
\begin { verbatim}
2013-12-04 16:56:16 -05:00
Characters Added -> [1800, 2912, 1953, 3194]
Characters Removed -> [25, 42, 17, 170]
2013-04-15 18:42:10 -04:00
\end { verbatim}
2013-12-04 16:56:16 -05:00
In this case, we compress the raw values as opposed to their dictionary
2013-04-15 18:42:10 -04:00
representations.
2013-12-08 21:01:01 -05:00
\subsection { Indices for Filtering Data}
2013-12-09 20:19:17 -05:00
In many real world OLAP workflows, queries are issued for the aggregated
results of some set of metrics where some set of dimension specifications are
2014-03-09 18:31:59 -04:00
met. An example query is: "How many Wikipedia edits were done by users in
2013-12-08 21:01:01 -05:00
San Francisco who are also male?". This query is filtering the Wikipedia data
set in Table~\ref { tab:sample_ data} based on a Boolean expression of dimension
values. In many real world data sets, dimension columns contain strings and
metric columns contain numeric values. Druid creates additional lookup
indices for string columns such that only those rows that pertain to a
particular query filter are ever scanned.
2013-12-04 16:56:16 -05:00
Let us consider the page column in
Table~\ref { tab:sample_ data} . For each unique page in
2013-04-15 18:42:10 -04:00
Table~\ref { tab:sample_ data} , we can form some representation
2013-12-04 16:56:16 -05:00
indicating in which table rows a particular page is seen. We can
2013-04-15 18:42:10 -04:00
store this information in a binary array where the array indices
2013-12-04 16:56:16 -05:00
represent our rows. If a particular page is seen in a certain
2013-04-15 18:42:10 -04:00
row, that array index is marked as \texttt { 1} . For example:
\begin { verbatim}
2013-12-04 16:56:16 -05:00
Justin Bieber -> rows [0, 1] -> [1][1][0][0]
Ke$ ha - > rows [ 2 , 3 ] - > [ 0 ] [ 0 ] [ 1 ] [ 1 ]
2013-04-15 18:42:10 -04:00
\end { verbatim}
2013-12-04 16:56:16 -05:00
\texttt { Justin Bieber} is seen in rows \texttt { 0} and \texttt { 1} . This mapping of column values
2013-04-15 18:42:10 -04:00
to row indices forms an inverted index \cite { tomasic1993performance} . To know which
2013-12-04 16:56:16 -05:00
rows contain { \ttfamily Justin Bieber} or { \ttfamily Ke\$ ha} , we can \texttt { OR} together
2013-04-15 18:42:10 -04:00
the two arrays.
\begin { verbatim}
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
\end { verbatim}
\begin { figure}
\centering
\includegraphics [width = 3in] { concise_ plot}
\caption { Integer array size versus Concise set size.}
\label { fig:concise_ plot}
\end { figure}
2013-12-04 16:56:16 -05:00
This approach of performing Boolean operations on large bitmap sets is commonly
2014-02-19 21:41:37 -05:00
used in search engines. Bitmap indices for OLAP workloads is described in
detail in \cite { o1997improved} . Bitmap compression algorithms are a
well-defined area of research and often utilize run-length encoding. Popular
algorithms include Byte-aligned Bitmap Code \cite { antoshenkov1995byte} ,
Word-Aligned Hybrid (WAH) code \cite { wu2006optimizing} , and Partitioned
Word-Aligned Hybrid (PWAH) compression \cite { van2011memory} . Druid opted to use
the Concise algorithm \cite { colantonio2010concise} as it can outperform WAH by
reducing the size of the compressed bitmaps by up to 50\% .
Figure~\ref { fig:concise_ plot} illustrates the number of bytes using Concise
compression versus using an integer array. The results were generated on a
cc2.8xlarge system with a single thread, 2G heap, 512m young gen, and a forced
GC between each run. The data set is a single day’ s worth of data collected
from the Twitter garden hose \cite { twitter2013} data stream. The data set
contains 2,272,295 rows and 12 dimensions of varying cardinality. As an
additional comparison, we also resorted the data set rows to maximize
compression.
2013-12-04 16:56:16 -05:00
In the unsorted case, the total Concise size was 53,451,144 bytes and the total
integer array size was 127,248,520 bytes. Overall, Concise compressed sets are
about 42\% smaller than integer arrays. In the sorted case, the total Concise
compressed size was 43,832,884 bytes and the total integer array size was
127,248,520 bytes. What is interesting to note is that after sorting, global
compression only increased minimally. The total Concise set size to total
integer array size is 34\% . It is also interesting to note that as the
cardinality of a dimension approaches the total number of rows in a data set,
integer arrays require less space than Concise sets and become a better
alternative.
2013-04-15 18:42:10 -04:00
\subsection { Storage Engine}
2013-12-04 16:56:16 -05:00
Druid’ s persistence components allows for different storage engines to be
plugged in, similar to Dynamo \cite { decandia2007dynamo} . These storage engines
2013-12-08 21:01:01 -05:00
may store data in an entirely in-memory structure such as the JVM heap or in
memory-mapped structures. The ability to swap storage engines allows for Druid
to be configured depending on a particular application’ s specifications. An
in-memory storage engine may be operationally more expensive than a
memory-mapped storage engine but could be a better alternative if performance
is critical. By default, a memory-mapped storage engine is used.
When using a memory-mapped storage engine, Druid relies on the operating system
to page segments in and out of memory. Given that segments can only be scanned
if they are loaded in memory, a memory-mapped storage engine allows recent
segments to retain in memory whereas segments that are never queried are paged
out. The main drawback with using the memory-mapped storage engine is when a
query requires more segments to be paged into memory than a given node has
capacity for. In this case, query performance will suffer from the cost of
paging segments in and out of memory.
2013-12-04 16:56:16 -05:00
\section { Query API}
\label { sec:query-api}
Druid has its own query language and accepts queries as POST requests. Broker,
historical, and real-time nodes all share the same query API.
The body of the POST request is a JSON object containing key-value pairs
specifying various query parameters. A typical query will contain the data
source name, the granularity of the result data, time range of interest, the
type of request, and the metrics to aggregate over. The result will also be a
JSON object containing the aggregated metrics over the time period.
Most query types will also support a filter set. A filter set is a Boolean
expression of dimension name and value pairs. Any number and combination of
dimensions and values may be specified. When a filter set is provided, only
the subset of the data that pertains to the filter set will be scanned. The
ability to handle complex nested filter sets is what enables Druid to drill
into data at any depth.
The exact query syntax depends on the query type and the information requested.
2014-02-23 14:36:39 -05:00
A sample count query over a week of data is as follows:
\newpage
2013-12-04 16:56:16 -05:00
\begin { verbatim}
{
"queryType" : "timeseries",
"dataSource" : "wikipedia",
"intervals" : "2013-01-01/2013-01-08",
"filter" : {
"type" : "selector",
"dimension" : "page",
"value" : "Ke$ ha"
} ,
"granularity" : "day",
2013-12-08 21:01:01 -05:00
"aggregations" : [ {
2013-12-04 16:56:16 -05:00
"type" : "count",
"name" : "rows"
2013-12-08 21:01:01 -05:00
} ]
2013-12-04 16:56:16 -05:00
}
\end { verbatim}
2013-12-08 21:01:01 -05:00
The query shown above will return a count of the number of rows in the Wikipedia datasource
2013-12-04 16:56:16 -05:00
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
equal to "Ke\$ ha". The results will be bucketed by day and will be a JSON array of the following form:
\begin { verbatim}
2013-12-08 21:01:01 -05:00
[ {
2013-12-04 16:56:16 -05:00
"timestamp": "2012-01-01T00:00:00.000Z",
"result": {
"rows": 393298
}
} ,
{
"timestamp": "2012-01-02T00:00:00.000Z",
"result": {
"rows": 382932
}
} ,
...
{
"timestamp": "2012-01-07T00:00:00.000Z",
"result": {
"rows": 1337
}
2013-12-08 21:01:01 -05:00
} ]
2013-12-04 16:56:16 -05:00
\end { verbatim}
Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and several others. Druid also supports complex aggregations
2013-12-09 20:19:17 -05:00
such as cardinality estimation and approximate quantile estimation. The
2013-12-04 16:56:16 -05:00
results of aggregations can be combined in mathematical expressions to form
other aggregations. The query API is highly customizable and can be extended to
filter and group results based on almost any arbitrary condition. It is beyond
the scope of this paper to fully describe the query API but more information
can be found
online\footnote { \href { http://druid.io/docs/latest/Querying.html} { http://druid.io/docs/latest/Querying.html} } .
2014-03-09 18:31:59 -04:00
2014-02-23 14:36:39 -05:00
At the time of writing, the query language does not support joins. Although the
storage format is able to support joins, we've targeted Druid at user-facing
workloads that must return in a matter of seconds, and as such, we've chosen to
not spend the time to implement joins as it has been our experience that
2014-03-09 18:31:59 -04:00
requiring joins on your queries often limits the performance you can achieve.
2014-02-23 14:36:39 -05:00
2014-03-09 18:31:59 -04:00
\newpage
2014-03-07 20:03:22 -05:00
\section { Performance}
2014-03-09 18:31:59 -04:00
\label { sec:benchmarks}
Druid runs in production at several organizations, and to demonstrate its
performance, we've chosen to share some real world numbers of the production
cluster at Metamarkets. The date range of the data is for Feburary 2014.
2013-12-04 16:56:16 -05:00
\subsection { Query Performance}
2014-03-07 20:03:22 -05:00
Druid query performance can vary signficantly depending on the actual query
being issued. For example, determining the approximate cardinality of a given
dimension is a much more expensive operation than a simple sum of a metric
column. Similarily, sorting the values of a high cardinality dimension based on
a given metric is much more expensive than a simple count over a time range.
Furthermore, the time range of a query and the number of metric aggregators in
the query will contribute to query latency. Instead of going into full detail
2014-03-09 18:31:59 -04:00
about every query issued in our production cluster, we've instead chosen to
showcase a higher level view of average latencies in our cluster. We selected 8
of our most queried data sources, described in Table~\ref { tab:datasources} .
2014-03-07 20:03:22 -05:00
\begin { table}
\centering
2014-03-09 18:31:59 -04:00
\caption { Dimensions and metrics of the 8 most queried Druid data sources in production.}
2014-03-07 20:03:22 -05:00
\label { tab:datasources}
\begin { tabular} { | l | l | l |}
\hline
\textbf { Data Source} & \textbf { Dimensions} & \textbf { Metrics} \\ \hline
2014-03-09 18:31:59 -04:00
\texttt { a} & 25 & 21 \\ \hline
\texttt { b} & 30 & 26 \\ \hline
\texttt { c} & 71 & 35 \\ \hline
\texttt { d} & 60 & 19 \\ \hline
\texttt { e} & 29 & 8 \\ \hline
\texttt { f} & 30 & 16 \\ \hline
\texttt { g} & 26 & 18 \\ \hline
\texttt { h} & 78 & 14 \\ \hline
2014-03-07 20:03:22 -05:00
\end { tabular}
\end { table}
2014-03-09 18:31:59 -04:00
Some more details of our results:
2014-03-07 20:03:22 -05:00
2013-04-15 18:42:10 -04:00
\begin { itemize}
2014-03-09 18:31:59 -04:00
\item The results are from a "hot" tier in our production cluster. We run several tiers of varying performance in production.
2014-03-07 20:03:22 -05:00
\item There is approximately 10.5TB of RAM available in the "hot" tier and approximately 10TB of segments loaded (including replication). Collectively, there are about 50 billion Druid rows in this tier. Results for every data source are not shown.
\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing threads and 672 total cores (hyperthreaded).
2013-04-15 18:42:10 -04:00
\item A memory-mapped storage engine was used (the machine was configured to memory map the data
instead of loading it into the Java heap.)
\end { itemize}
2014-03-07 20:03:22 -05:00
The average query latency is shown in Figure~\ref { fig:avg_ query_ latency} and
the queries per minute is shown in Figure~\ref { fig:queries_ per_ min} . We can see
2014-03-09 18:31:59 -04:00
that across the various data sources, the average query latency is approximately
2014-03-07 20:03:22 -05:00
540ms. The 90th percentile query latency across these data sources is < 1s, the
95th percentile is < 2s, and the 99th percentile is < 10s. The percentiles are
shown in Figure~\ref { fig:query_ percentiles} . It is very possible to possible to
decrease query latencies by adding additional hardware, but we have not chosen
to do so because infrastructure cost is still a consideration.
\begin { figure}
\centering
\includegraphics [width = 2.8in] { avg_ query_ latency}
2014-03-09 18:31:59 -04:00
\caption { Druid production cluster average query latencies for multiple data sources.}
2014-03-07 20:03:22 -05:00
\label { fig:avg_ query_ latency}
\end { figure}
\begin { figure}
\centering
\includegraphics [width = 2.8in] { queries_ per_ min}
2014-03-09 18:31:59 -04:00
\caption { Druid production cluster queries per minute for multiple data sources.}
2014-03-07 20:03:22 -05:00
\label { fig:queries_ per_ min}
\end { figure}
\begin { figure}
\centering
\includegraphics [width = 2.8in] { query_ percentiles}
2014-03-09 18:31:59 -04:00
\caption { Druid production cluster 90th, 95th, and 99th query latency percentiles for the 8 most queried data sources.}
2014-03-07 20:03:22 -05:00
\label { fig:query_ percentiles}
\end { figure}
2014-03-09 19:17:49 -04:00
We also present Druid benchmarks with TPC-H data. Our setup used Amazon EC2
m3.2xlarge (CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
historical nodes. Most TPC-H queries do not directly apply to Druid, so we
selected similiar queries to demonstrate Druid's query performance. As a
comparison, we also provide the results of the same queries using MySQL with
MyISAM (InnoDB was slower in our experiments). Our MySQL setup was an Amazon
RDS instance that also ran on an m3.2xlarge node.We selected MySQL to benchmark
against because of its universal popularity. We choose not to select another
open source column store because we were not confident we could correctly tune
it for optimal performance. The results for the 1 GB TPC-H data set are shown
in Figure~\ref { fig:tpch_ 1gb} and the results of the 100 GB data set are shown
in Figure~\ref { fig:tpch_ 100gb} . We benchmarked Druid's scan rate at
53,539,211.1 rows/second/core for count(*) over a given interval and 36,246,530
rows/second/core for an aggregation involving floats.
2014-03-07 20:03:22 -05:00
\begin { figure}
\centering
\includegraphics [width = 2.8in] { tpch_ 1gb}
2014-03-09 18:31:59 -04:00
\caption { Druid and MySQL (MyISAM) benchmarks with the TPC-H 1 GB data set.}
2014-03-07 20:03:22 -05:00
\label { fig:tpch_ 1gb}
\end { figure}
\begin { figure}
\centering
\includegraphics [width = 2.8in] { tpch_ 100gb}
2014-03-09 18:31:59 -04:00
\caption { Druid and MySQL (MyISAM) benchmarks with the TPC-H 100 GB data set.}
2014-03-07 20:03:22 -05:00
\label { fig:tpch_ 100gb}
\end { figure}
2014-03-09 18:31:59 -04:00
Finally, we present our results of scaling Druid to meet increasing data
2014-03-09 19:17:49 -04:00
volumes with the TPC-H 100 GB data set. Our distributed cluster used Amazon EC2
c3.2xlarge (Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz) instances for broker
nodes. We observe that when we increased the number of cores from 8 to 48, we
do not always display linear scaling. The increase in speed of a parallel
computing system is often limited by the time needed for the sequential
operations of the system, in accordance with Amdahl's law
\cite { amdahl1967validity} . Our query results and query speedup are shown in
Figure~\ref { fig:tpch_ scaling} .
2014-03-07 20:03:22 -05:00
\begin { figure}
\centering
\includegraphics [width = 2.8in] { tpch_ scaling}
2014-03-09 18:31:59 -04:00
\caption { Scaling a Druid cluster with the TPC-H 100 GB data set.}
2014-03-07 20:03:22 -05:00
\label { fig:tpch_ scaling}
\end { figure}
2013-04-15 18:42:10 -04:00
2013-12-04 16:56:16 -05:00
\subsection { Data Ingestion Performance}
2014-03-09 18:31:59 -04:00
To showcase Druid's data ingestion latency, we selected several production
datasources of varying dimensions, metrics, and event volumes. Our production
2014-03-07 20:03:22 -05:00
ingestion setup is as follows:
2013-12-04 16:56:16 -05:00
\begin { itemize}
2014-03-07 20:03:22 -05:00
\item Total RAM: 360 GB
\item Total CPU: 12 x Intel Xeon E5-2670 (96 cores)
2014-03-09 18:31:59 -04:00
\item Note: In this setup, several other data sources were being ingested and many other Druid related ingestion tasks were running across these machines.
2013-12-04 16:56:16 -05:00
\end { itemize}
2013-12-06 18:59:24 -05:00
Druid's data ingestion latency is heavily dependent on the complexity of the
data set being ingested. The data complexity is determined by the number of
dimensions in each event, the number of metrics in each event, and the types of
aggregations we want to perform on those metrics. With the most basic data set
(one that only has a timestamp column), our setup can ingest data at a rate of
2014-03-09 18:31:59 -04:00
800,000 events/sec/core, which is really just a measurement of how fast we can
2014-03-07 20:03:22 -05:00
deserialize events. Real world data sets are never this simple. A description
of the data sources we selected is shown in Table~\ref { tab:ingest_ datasources} .
2013-12-08 21:01:01 -05:00
2014-03-07 20:03:22 -05:00
\begin { table}
\centering
2014-03-09 18:31:59 -04:00
\caption { Dimensions, metrics, and peak throughputs of various ingested data sources.}
2014-03-07 20:03:22 -05:00
\label { tab:ingest_ datasources}
\begin { tabular} { | l | l | l | l |}
\hline
\textbf { Data Source} & \textbf { Dims} & \textbf { Mets} & \textbf { Peak Throughput (events/sec)} \\ \hline
\texttt { s} & 7 & 2 & 28334.60 \\ \hline
\texttt { t} & 10 & 7 & 68808.70 \\ \hline
\texttt { u} & 5 & 1 & 49933.93 \\ \hline
\texttt { v} & 30 & 10 & 22240.45 \\ \hline
\texttt { w} & 35 & 14 & 135763.17 \\ \hline
\texttt { x} & 28 & 6 & 46525.85 \\ \hline
\texttt { y} & 33 & 24 & 162462.41 \\ \hline
\texttt { z} & 33 & 24 & 95747.74 \\ \hline
\end { tabular}
\end { table}
2013-12-04 16:56:16 -05:00
2014-03-07 20:03:22 -05:00
We can see that based on the descriptions in
Table~\ref { tab:ingest_ datasources} , latencies vary significantly and the
ingestion latency is not always a factor of the number of dimensions and
2014-03-10 17:16:19 -04:00
metrics. We see some lower latencies on simple data sets because that was the
rate that the data producer was delivering data. The results are shown in
Figure~\ref { fig:ingestion_ rate} . We define throughput as the number of events a
real-time node can ingest and also make queryable. If too many events are sent
to the real-time node, those events are blocked until the real-time node has
capacity to accept them.
2013-12-08 21:01:01 -05:00
2013-12-04 16:56:16 -05:00
\begin { figure}
\centering
2014-03-07 20:03:22 -05:00
\includegraphics [width = 2.8in] { ingestion_ rate}
2014-03-09 18:31:59 -04:00
\caption { Druid production cluster ingestion rates for multiple data sources.}
2014-03-07 20:03:22 -05:00
\label { fig:ingestion_ rate}
2013-12-04 16:56:16 -05:00
\end { figure}
2014-03-10 16:47:47 -04:00
The peak ingestion latency we measured in production was 22914.43 events/sec/core
2014-03-09 18:31:59 -04:00
on an Amazon EC2 cc2.8xlarge. The data source had 30 dimensions and 19 metrics.
2014-02-19 21:41:37 -05:00
\section { Druid in Production}
2014-02-23 14:36:39 -05:00
\label { sec:production}
2014-03-09 18:31:59 -04:00
Over the last few years, we've gained tremendous
2014-02-20 21:15:54 -05:00
knowledge about handling production workloads, setting up correct operational
monitoring, integrating Druid with other products as part of a more
sophisticated data analytics stack, and distributing data to handle entire data
center outages. One of the most important lessons we've learned is that no
amount of testing can accurately simulate a production environment, and failures
will occur for every imaginable and unimaginable reason. Interestingly, most of
our most severe crashes were due to misunderstanding the impacts a
seemingly small feature would have on the overall system.
Some of our more interesting observations include:
\begin { itemize}
2014-03-09 18:31:59 -04:00
\item Druid is often used in production to power exploratory dashboards. Many
users of exploratory dashboards are not from technical backgrounds, and they
often issue queries without understanding the impacts to the underlying system.
For example, some users become impatient that their queries for terabytes of
data do not return in milliseconds and continously refresh their dashboard
view, generating heavy load to Druid. This type of usage forced Druid to defend
itself against expensive repetitive queries.
2014-02-20 21:15:54 -05:00
\item Cluster query performance benefits from multitenancy. Hosting every
production datasource in the same cluster leads to better data parallelization
as additional nodes are added.
2014-03-09 18:31:59 -04:00
\item Even if you provide users with the ability to arbitrarily explore data,
they often only have a few questions in mind. Caching is extremely important in
this case, and we see a very high cache hit rates.
2014-02-20 21:15:54 -05:00
\item When using a memory mapped storage engine, even a small amount of paging
2014-03-09 18:31:59 -04:00
data from disk can severely impact query performance. SSDs greatly mitigate
2014-02-20 21:15:54 -05:00
this problem.
\item Leveraging approximate algorithms can greatly reduce data storage costs and
improve query performance. Many users do not care about exact answers to their
questions and are comfortable with a few percentage points of error.
\end { itemize}
2014-02-19 21:41:37 -05:00
\subsection { Operational Monitoring}
2014-02-20 21:15:54 -05:00
Proper monitoring is critical to run a large scale distributed cluster.
2014-02-19 21:41:37 -05:00
Each Druid node is designed to periodically emit a set of operational metrics.
These metrics may include system level data such as CPU usage, available
memory, and disk capacity, JVM statistics such as garbage collection time, and
heap usage, or node specific metrics such as segment scan time, cache
hit rates, and data ingestion latencies. For each query, Druid nodes can also
emit metrics about the details of the query such as the number of filters
applied, or the interval of data requested.
Metrics can be emitted from a production Druid cluster into a dedicated metrics
Druid cluster. Queries can be made to the metrics Druid cluster to explore
production cluster performance and stability. Leveraging a dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production. This analysis allows us to determine what our users are
2014-02-20 21:15:54 -05:00
most often doing and we use this information to drive our road map.
2014-02-19 21:41:37 -05:00
\subsection { Pairing Druid with a Stream Processor}
2014-02-20 21:15:54 -05:00
At the time of writing, Druid can only understand fully denormalized data
2014-02-19 21:41:37 -05:00
streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite { marz2013storm} . A
Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations
such as multi-stream joins. The Storm topology forwards the processed event
stream to Druid in real-time. Storm handles the streaming data processing work,
2014-03-09 18:31:59 -04:00
and Druid is used for responding to queries for both real-time and
2014-02-19 21:41:37 -05:00
historical data.
\subsection { Multiple Data Center Distribution}
Large scale production outages may not only affect single nodes, but entire
data centers as well. The tier configuration in Druid coordinator nodes allow
for segments to be replicated across multiple tiers. Hence, segments can be
exactly replicated across historical nodes in multiple data centers.
Similarily, query preference can be assigned to different tiers. It is possible
to have nodes in one data center act as a primary cluster (and recieve all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
2014-02-23 14:36:39 -05:00
\section { Related Work}
\label { sec:related}
Cattell \cite { cattell2011scalable} maintains a great summary about existing
Scalable SQL and NoSQL data stores. Hu \cite { hu2011stream} contributed another
great summary for streaming databases. Druid feature-wise sits somewhere
between Google’ s Dremel \cite { melnik2010dremel} and PowerDrill
\cite { hall2012processing} . Druid has most of the features implemented in Dremel
(Dremel handles arbitrary nested data structures while Druid only allows for a
single level of array-based nesting) and many of the interesting compression
algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite { fink2012distributed} , many of these data stores are
designed to be more generic key-value stores \cite { lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
SAP’ s HANA \cite { farber2012sap} and VoltDB \cite { voltdb2010voltdb} . These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite { paraccel2013} , however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite { stonebraker2005c, cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite { marz2013storm} and Streaming Spark
\cite { zaharia2012discretized} , however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
before the data enters Druid.
There are a class of systems that specialize in queries on top of cluster
computing frameworks. Shark \cite { engle2012shark} is such a system for queries
on top of Spark, and Cloudera's Impala \cite { cloudera2013} is another system
focused on optimizing query performance on top of HDFS. Druid historical nodes
download data locally and only work with native Druid indexes. We believe this
setup allows for faster query latencies.
Druid leverages a unique combination of algorithms in its
architecture. Although we believe no other data store has the same set
of functionality as Druid, some of Druid’ s optimization techniques such as using
inverted indices to perform fast filters are also used in other data
stores \cite { macnicol2004sybase} .
\section { Conclusions}
2013-04-15 18:42:10 -04:00
\label { sec:conclusions}
2013-12-04 16:56:16 -05:00
In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
2013-12-08 21:01:01 -05:00
and is optimized for low query latencies. Druid supports streaming data
2014-02-20 21:15:54 -05:00
ingestion and is fault-tolerant. We discussed how Druid benchmarks and
summarized key architecture aspects such
as the storage format, query language, and general execution.
2013-04-15 18:42:10 -04:00
\balance
\section { Acknowledgements}
\label { sec:acknowledgements}
2013-12-04 16:56:16 -05:00
Druid could not have been built without the help of many great engineers at
Metamarkets and in the community. We want to thank everyone that has
2013-12-08 21:01:01 -05:00
contributed to the Druid codebase for their invaluable support.
2013-04-15 18:42:10 -04:00
% The following two commands are all you need in the
% initial runs of your .tex file to
% produce the bibliography for the citations in your paper.
\bibliographystyle { abbrv}
\bibliography { druid} % druid.bib is the name of the Bibliography in this case
% You must have a proper ".bib" file
% and remember to run:
% latex bibtex latex latex
% to resolve all references
%Generated by bibtex from your ~.bib file. Run latex,
%then bibtex, then latex twice (to resolve references).
%APPENDIX is optional.
% ****************** APPENDIX **************************************
% Example of an appendix; typically would start on a new page
%pagebreak
\end { document}