more edits

This commit is contained in:
fjy 2014-03-12 16:28:15 -07:00
parent 516e855144
commit 58ce13cfab
2 changed files with 101 additions and 125 deletions

Binary file not shown.

View File

@ -96,10 +96,10 @@ Section \ref{sec:problem-definition}. Next, we detail system architecture from
the point of view of how data flows through the system in Section
\ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the
query API in Section \ref{sec:query-api} and present our experimental results
in Section \ref{sec:benchmarks}. Lastly, we leave off with what we've learned from
running Druid in production in Section \ref{sec:production}, related work
in Section \ref{sec:related}, and conclusions in Section \ref{sec:conclusions}.
query API in Section \ref{sec:query-api} and present performance results
in Section \ref{sec:benchmarks}. Lastly, we leave off with our lessons from
running Druid in production in Section \ref{sec:production}, and related work
in Section \ref{sec:related}.
\section{Problem Definition}
\label{sec:problem-definition}
@ -174,14 +174,14 @@ analytics platform in multiple companies.
\label{sec:architecture}
A Druid cluster consists of different types of nodes and each node type is
designed to perform a specific set of things. We believe this design separates
concerns and simplifies the complexity of the system. The different node types
concerns and simplifies the complexity of the system. The different node types
operate fairly independent of each other and there is minimal interaction
between them. Hence, intra-cluster communication failures have minimal impact
on data availability. To solve complex data analysis problems, the different
node types come together to form a fully working system. The name Druid comes
from the Druid class in many role-playing games: it is a shape-shifter, capable
of taking on many different forms to fulfill various different roles in a
group. The composition of and flow of data in a Druid cluster are shown in
group. The composition of and flow of data in a Druid cluster are shown in
Figure~\ref{fig:cluster}.
\begin{figure*}
@ -206,7 +206,7 @@ Zookeeper.
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
indexes are also directly queryable. Druid virtually behaves as a row store
indexes are also directly queryable. Druid behaves as a row store
for queries on events that exist in this JVM heap-based buffer. To avoid heap
overflow problems, real-time nodes persist their in-memory indexes to disk
either periodically or after some maximum row limit is reached. This persist
@ -218,10 +218,10 @@ in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{realtime_flow}
\includegraphics[width = 2.6in]{realtime_flow}
\caption{Real-time nodes first buffer events in memory. On a periodic basis,
the in-memory index is persisted to disk. On another periodic basis, all
persisted indexes are merged together and handed off. Queries for data will hit the
persisted indexes are merged together and handed off. Queries will hit the
in-memory index and the persisted indexes.}
\label{fig:realtime_flow}
\end{figure}
@ -237,25 +237,23 @@ file system such as S3 \cite{decandia2007dynamo} or HDFS
persist, merge, and handoff steps are fluid; there is no data loss during any
of the processes.
To better understand the flow of data through a real-time node, consider the
following example. First, we start a real-time node at 13:37. The node will
only accept events for the current hour or the next hour. When the node begins
ingesting events, it will announce that it is serving a segment of data for a
time window from 13:00 to 14:00. Every 10 minutes (the persist period is
configurable), the node will flush and persist its in-memory buffer to disk.
Near the end of the hour, the node will likely see events with timestamps from
14:00 to 15:00. When this occurs, the node prepares to serve data for the next
hour and creates a new in-memory index. The node then announces that it is also
serving a segment for data from 14:00 to 15:00. The node does not immediately
merge the indexes it persisted from 13:00 to 14:00, instead it waits for a
configurable window period for straggling events from 13:00 to 14:00 to come
in. Having a window period minimizes the risk of data loss from delays in event
delivery. At the end of the window period, the real-time node merges all
persisted indexes from 13:00 to 14:00 into a single immutable segment and hands
the segment off. Once this segment is loaded and queryable somewhere else in
the Druid cluster, the real-time node flushes all information about the data it
collected for 13:00 to 14:00 and unannounces it is serving this data. This
process is shown in Figure~\ref{fig:realtime_timeline}.
Figure~\ref{fig:realtime_timeline} illustrates the operations of a real-time
node. The node starts at 13:37 and will only accept events for the current hour
or the next hour. When events are ingested, the node announces that it is
serving a segment of data for an interval from 13:00 to 14:00. Every 10
minutes (the persist period is configurable), the node will flush and persist
its in-memory buffer to disk. Near the end of the hour, the node will likely
see events for 14:00 to 15:00. When this occurs, the node prepares to serve
data for the next hour and creates a new in-memory index. The node then
announces that it is also serving a segment from 14:00 to 15:00. The node does
not immediately merge persisted indexes from 13:00 to 14:00, instead it waits
for a configurable window period for straggling events from 13:00 to 14:00 to
arrive. This window period minimizes the risk of data loss from delays in event
delivery. At the end of the window period, the node merges all persisted
indexes from 13:00 to 14:00 into a single immutable segment and hands the
segment off. Once this segment is loaded and queryable somewhere else in the
Druid cluster, the real-time node flushes all information about the data it
collected for 13:00 to 14:00 and unannounces it is serving this data.
\begin{figure*}
\centering
@ -284,26 +282,26 @@ milliseconds.
The purpose of the message bus in Figure~\ref{fig:realtime_pipeline} is
two-fold. First, the message bus acts as a buffer for incoming events. A
message bus such as Kafka maintains offsets indicating the position in an event
stream that a consumer (a real-time node) has read up to and consumers can
programmatically update these offsets. Typically, real-time nodes update this
offset each time they persist their in-memory buffers to disk. In a fail and
recover scenario, if a node has not lost disk, it can reload all persisted
indexes from disk and continue reading events from the last offset it
committed. Ingesting events from a recently committed offset greatly reduces a
node's recovery time. In practice, we see real-time nodes recover from such
failure scenarios in an order of seconds.
message bus such as Kafka maintains positional offsets indicating how far a
consumer (a real-time node) has read in an event stream. Consumers can
programmatically update these offsets. Real-time nodes update this offset each
time they persist their in-memory buffers to disk. In a fail and recover
scenario, if a node has not lost disk, it can reload all persisted indexes from
disk and continue reading events from the last offset it committed. Ingesting
events from a recently committed offset greatly reduces a node's recovery time.
In practice, we see nodes recover from such failure scenarios in a
few seconds.
The second purpose of the message bus is to act as a single endpoint from which
multiple real-time nodes can read events. Multiple real-time nodes can ingest
the same set of events from the bus, thus creating a replication of events. In
a scenario where a node completely fails and does not recover, replicated
streams ensure that no data is lost. A single ingestion endpoint also allows
for data streams for be partitioned such that multiple real-time nodes each
ingest a portion of a stream. This allows additional real-time nodes to be
seamlessly added. In practice, this model has allowed one of the largest
production Druid clusters to be able to consume raw data at approximately 500
MB/s (150,000 events/s or 2 TB/hour).
the same set of events from the bus, creating a replication of events. In a
scenario where a node completely fails and loses disk, replicated streams
ensure that no data is lost. A single ingestion endpoint also allows for data
streams for be partitioned such that multiple real-time nodes each ingest a
portion of a stream. This allows additional real-time nodes to be seamlessly
added. In practice, this model has allowed one of the largest production Druid
clusters to be able to consume raw data at approximately 500 MB/s (150,000
events/s or 2 TB/hour).
\subsection{Historical Nodes}
Historical nodes encapsulate the functionality to load and serve the immutable
@ -506,9 +504,7 @@ Druid always requires a timestamp column as a method of simplifying data
distribution policies, data retention policies, and first-level query pruning.
Druid partitions its data sources into well-defined time intervals, typically
an hour or a day, and may further partition on values from other columns to
achieve the desired segment size. For example, partitioning the data in
Table~\ref{tab:sample_data} by hour results in two segments for 2011-01-01, and
partitioning the data by day results in a single segment. The time granularity
achieve the desired segment size. The time granularity
to partition segments is a function of data volume and time range. A data set
with timestamps spread over a year is better partitioned by day, and a data set
with timestamps spread over a day is better partitioned by hour.
@ -610,31 +606,24 @@ the two arrays.
This approach of performing Boolean operations on large bitmap sets is commonly
used in search engines. Bitmap indices for OLAP workloads is described in
detail in \cite{o1997improved}. Bitmap compression algorithms are a
well-defined area of research and often utilize run-length encoding. Popular
algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte},
Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned
Word-Aligned Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use
the Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by
reducing the size of the compressed bitmaps by up to 50\%.
Figure~\ref{fig:concise_plot} illustrates the number of bytes using Concise
compression versus using an integer array. The results were generated on a
cc2.8xlarge system with a single thread, 2G heap, 512m young gen, and a forced
GC between each run. The data set is a single days worth of data collected
from the Twitter garden hose \cite{twitter2013} data stream. The data set
contains 2,272,295 rows and 12 dimensions of varying cardinality. As an
additional comparison, we also resorted the data set rows to maximize
compression.
well-defined area of research \cite{antoshenkov1995byte, wu2006optimizing,
van2011memory} and often utilize run-length encoding. Druid opted to use the
Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by
reducing compressed bitmap size by up to 50\%. Figure~\ref{fig:concise_plot}
illustrates the number of bytes using Concise compression versus using an
integer array. The results were generated on a cc2.8xlarge system with a single
thread, 2G heap, 512m young gen, and a forced GC between each run. The data set
is a single days worth of data collected from the Twitter garden hose
\cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
dimensions of varying cardinality. As an additional comparison, we also
resorted the data set rows to maximize compression.
In the unsorted case, the total Concise size was 53,451,144 bytes and the total
integer array size was 127,248,520 bytes. Overall, Concise compressed sets are
about 42\% smaller than integer arrays. In the sorted case, the total Concise
compressed size was 43,832,884 bytes and the total integer array size was
127,248,520 bytes. What is interesting to note is that after sorting, global
compression only increased minimally. The total Concise set size to total
integer array size is 34\%. It is also interesting to note that as the
cardinality of a dimension approaches the total number of rows in a data set,
integer arrays require less space than Concise sets and become a better
alternative.
compression only increased minimally.
\subsection{Storage Engine}
Druids persistence components allows for different storage engines to be
@ -675,22 +664,18 @@ into data at any depth.
The exact query syntax depends on the query type and the information requested.
A sample count query over a week of data is as follows:
\newpage
\begin{verbatim}
{
"queryType" : "timeseries",
"dataSource" : "wikipedia",
"intervals" : "2013-01-01/2013-01-08",
"filter" : {
"type" : "selector",
"dimension" : "page",
"value" : "Ke$ha"
},
"type":"selector",
"dimension":"page",
"value":"Ke$ha"
},
"granularity" : "day",
"aggregations" : [ {
"type" : "count",
"name" : "rows"
} ]
"aggregations" : [ {"type":"count", "name":"rows"} ]
}
\end{verbatim}
The query shown above will return a count of the number of rows in the Wikipedia datasource
@ -699,22 +684,16 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array
\begin{verbatim}
[ {
"timestamp": "2012-01-01T00:00:00.000Z",
"result": {
"rows": 393298
}
"result": {"rows":393298}
},
{
"timestamp": "2012-01-02T00:00:00.000Z",
"result": {
"rows": 382932
}
"result": {"rows":382932}
},
...
{
"timestamp": "2012-01-07T00:00:00.000Z",
"result": {
"rows": 1337
}
"result": {"rows": 1337}
} ]
\end{verbatim}
Druid supports many types of aggregations including double sums, long sums,
@ -733,7 +712,6 @@ workloads that must return in a matter of seconds, and as such, we've chosen to
not spend the time to implement joins as it has been our experience that
requiring joins on your queries often limits the performance you can achieve.
\newpage
\section{Performance}
\label{sec:benchmarks}
Druid runs in production at several organizations, and to demonstrate its
@ -916,41 +894,42 @@ on an Amazon EC2 cc2.8xlarge. The data source had 30 dimensions and 19 metrics.
\section{Druid in Production}
\label{sec:production}
Over the last few years, we've gained tremendous
knowledge about handling production workloads, setting up correct operational
monitoring, integrating Druid with other products as part of a more
sophisticated data analytics stack, and distributing data to handle entire data
center outages. One of the most important lessons we've learned is that no
amount of testing can accurately simulate a production environment, and failures
will occur for every imaginable and unimaginable reason. Interestingly, most of
our most severe crashes were due to misunderstanding the impacts a
seemingly small feature would have on the overall system.
Some of our more interesting observations include:
Over the last few years, we've gained tremendous knowledge about handling
production workloads with Druid. Some of our more interesting observations include:
\begin{itemize}
\item Druid is often used in production to power exploratory dashboards. Many
users of exploratory dashboards are not from technical backgrounds, and they
often issue queries without understanding the impacts to the underlying system.
For example, some users become impatient that their queries for terabytes of
data do not return in milliseconds and continously refresh their dashboard
view, generating heavy load to Druid. This type of usage forced Druid to defend
itself against expensive repetitive queries.
\item Druid is often used to explore data and generate reports on data. In the
explore use case, the number of queries issued by a single user is much higher
than in the reporting use case. Exploratory queries often involve progressively
adding filters for the same time range to narrow down results. Users tend to
explore short time intervals of recent data. In the reporting use case, users
query for a much larger data interval, but already have a set of queries in
mind.
\item Cluster query performance benefits from multitenancy. Hosting every
production datasource in the same cluster leads to better data parallelization
as additional nodes are added.
\item Expensive concurrent queries can be problematic in a multitenant
environment. Queries for large datasources may end up hitting every historical
node in a cluster and consume all cluster resources. Smaller, cheaper queries
may be blocked from executing in such cases. We introduced query prioritization
to address these issues. Each historical node is able to prioritize which
segments it needs to scan. Proper query planning is critical for production
workloads. Thankfully, queries for a significant amount of data tend to be for
reporting use cases, and users are not expecting the same level of
interactivity as when they are querying to explore data.
\item Even if you provide users with the ability to arbitrarily explore data,
they often only have a few questions in mind. Caching is extremely important in
this case, and we see a very high cache hit rates.
\item Node failures are common in a distributed environment, but many nodes at
once failing are not. If historical nodes fail and do not recover, their
segments need to reassigned, which means we need excess cluster capacity to
load this data. The amount of additional capacity to have at any time is a
factor of cost. It is extremely rare to see more than 2 nodes fail at once and
never recover and hence, we leave enough capacity to completely reassign the
data from 2 historical nodes.
\item When using a memory mapped storage engine, even a small amount of paging
data from disk can severely impact query performance. SSDs greatly mitigate
this problem.
\item Leveraging approximate algorithms can greatly reduce data storage costs and
improve query performance. Many users do not care about exact answers to their
questions and are comfortable with a few percentage points of error.
\item Complete cluster failures are possible, but extremely rare. When running
in a single data center, it is possible for the entire data center to fail. In
such a case, a new cluster needs to be created. As long as deep storage is
available, cluster recovery time is network bound. Historical nodes need to
reload every segment from deep storage. We have experienced such a failure in
the past, and it took several hours for our entire Druid cluster to recover on
several TBs of data.
\end{itemize}
\subsection{Operational Monitoring}
@ -959,18 +938,15 @@ Each Druid node is designed to periodically emit a set of operational metrics.
These metrics may include system level data such as CPU usage, available
memory, and disk capacity, JVM statistics such as garbage collection time, and
heap usage, or node specific metrics such as segment scan time, cache
hit rates, and data ingestion latencies. For each query, Druid nodes can also
emit metrics about the details of the query such as the number of filters
applied, or the interval of data requested.
hit rates, and data ingestion latencies. Druid also emits per query metrics.
Metrics can be emitted from a production Druid cluster into a dedicated metrics
Druid cluster. Queries can be made to the metrics Druid cluster to explore
production cluster performance and stability. Leveraging a dedicated metrics
We emit metrics from a production Druid cluster and load them into a dedicated
metrics Druid cluster. The metrics Druid cluster is used to explore the
performance and stability of the production cluster. This dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production. This analysis allows us to determine what our users are
most often doing and we use this information to drive our road map.
made in production and what users are most interested in.
\subsection{Pairing Druid with a Stream Processor}
At the time of writing, Druid can only understand fully denormalized data