mirror of https://github.com/apache/druid.git
add more details to paper
This commit is contained in:
parent
d8a1dd8d68
commit
89785b2f52
|
@ -19,7 +19,7 @@ Druid currently allows for single-table queries in a similar manner to [Dremel](
|
|||
|
||||
As far as a comparison of systems is concerned, Druid sits in between PowerDrill and Dremel on the spectrum of functionality. It implements almost everything Dremel offers (Dremel handles arbitrary nested data structures while Druid only allows for a single level of array-based nesting) and gets into some of the interesting data layout and compression methods from PowerDrill.
|
||||
|
||||
Druid is a good fit for products that require real-time data ingestion of a single, large data stream. Especially if you are targeting no-downtime operation and are building your product on top of a time-oriented summarization of the incoming data stream. Druid is probably not the right solution if you care more about query flexibility and raw data access than query speed and no-downtime operation. When talking about query speed it is important to clarify what "fast" means: with Druid it is entirely within the realm of possibility (we have done it) to achieve queries that run in single-digit seconds across a 6TB data set.
|
||||
Druid is a good fit for products that require real-time data ingestion of a single, large data stream. Especially if you are targeting no-downtime operation and are building your product on top of a time-oriented summarization of the incoming data stream. Druid is probably not the right solution if you care more about query flexibility and raw data access than query speed and no-downtime operation. When talking about query speed it is important to clarify what "fast" means: with Druid it is entirely within the realm of possibility (we have done it) to achieve queries that run in less than a second across terabytes of data.
|
||||
|
||||
### Architecture
|
||||
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 191 KiB |
Binary file not shown.
After Width: | Height: | Size: 164 KiB |
Binary file not shown.
After Width: | Height: | Size: 392 KiB |
Binary file not shown.
After Width: | Height: | Size: 180 KiB |
Binary file not shown.
|
@ -66,13 +66,13 @@
|
|||
% 1st. author
|
||||
\alignauthor
|
||||
Fangjin Yang\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{fangjin@metamarkets.com}
|
||||
\affaddr{Imply Data, Inc.}\\
|
||||
\email{fj@imply.io}
|
||||
% 2nd. author
|
||||
\alignauthor
|
||||
Gian Merlino\\
|
||||
\affaddr{Metamarkets Group, Inc.}\\
|
||||
\email{gian@metamarkets.com}
|
||||
\affaddr{Imply Data, Inc.}\\
|
||||
\email{gian@imply.io}
|
||||
% 3rd. author
|
||||
\alignauthor
|
||||
Xavier Léauté\\
|
||||
|
@ -85,7 +85,7 @@ Xavier Léauté\\
|
|||
% in the \additional authors block, viz.
|
||||
\additionalauthors{Nishant Bangarwa,
|
||||
({\texttt{nishant@metamarkets.com}}), Eric Tschetter
|
||||
({\texttt{echeddar@gmail.com}})}
|
||||
({\texttt{cheddar@yahoo-inc.com}})}
|
||||
% Just remember to make sure that the TOTAL number of authors
|
||||
% is the number that will appear on the first page PLUS the
|
||||
% number that will appear in the \additionalauthors section.
|
||||
|
@ -284,7 +284,7 @@ We next evaluated HBase \cite{george2011hbase}, a NoSQL key/value store. As is
|
|||
common with using key/value stores, we precomputed the total set of queries we
|
||||
anticipated users would make. An example of this precomputation is shown in
|
||||
Figure~\ref{fig:precompute}. Our results with HBase are shown in
|
||||
Table~\ref{tab:hbase_results} for 500, 000 records.
|
||||
Table~\ref{tab:hbase_results} for 500,000 records.
|
||||
Queries were acceptably fast with this solution as we were effectively doing
|
||||
O(1) lookups into maps. However, the solution was not particularly flexible;
|
||||
if something wasn't precomputed, it wasn’t queryable, and the precomputation
|
||||
|
@ -309,7 +309,7 @@ data transformations, such as id to name lookups, up to complex operations such
|
|||
as multi-stream joins. Pairing Druid with a stream processor enabled flexible
|
||||
data processing and querying, but we still had problems with event delivery.
|
||||
Our events were delivered from many different locations and sources, and peaked
|
||||
at several hundred thousand events per second. We required a high throughput
|
||||
at several million events per second. We required a high throughput
|
||||
message bus that could hold these events for consumpation by our stream
|
||||
processor. To simplify data transmission for our clients, we wanted the
|
||||
message bus to be the single delivery endpoint for events entering our cluster.
|
||||
|
@ -425,16 +425,16 @@ some period of time. An example query for Table~\ref{tab:sample_data} may ask:
|
|||
bieberfever.com and ultratrimfast.com?”.
|
||||
|
||||
Consider the publisher column in Table~\ref{tab:sample_data}, a string column.
|
||||
For each unique publisher in Table 1, our inverted index tells us in which
|
||||
table rows a particular page is seen. Our inverted index looks like the
|
||||
following:
|
||||
For each unique publisher in Table 1, we can generate an inverted index that
|
||||
tells us in which table rows a particular page is seen. Our inverted index
|
||||
looks like the following:
|
||||
|
||||
{\small\begin{verbatim}
|
||||
bieberfever.com -> rows [0, 1, 2] -> [1][1][1][0][0][0]
|
||||
ultratrimfast.com -> rows [3, 4, 5] -> [0][0][0][1][1][1]
|
||||
\end{verbatim}}
|
||||
|
||||
In the binary array, the array indices represent our rows, and the array values
|
||||
In the inverted index, the array indices represent our rows, and the array values
|
||||
indicate whether a particular value was seen. In our example, bieberfever.com
|
||||
is seen in rows 0, 1 and 2. To know which rows contain bieberfever.com or
|
||||
ultratrimfast.com, we can OR together the two arrays.
|
||||
|
@ -483,7 +483,7 @@ both the in-memory and persisted indexes.
|
|||
|
||||
Real-time nodes employ a log structured merge tree\cite{o1996log} for recently
|
||||
ingested data. Incoming events are first stored in an in-memory buffer. The
|
||||
in-memory buffer is directly queryable and Druid behaves as a row store for
|
||||
in-memory buffer is directly queryable and Druid behaves as a key/value store for
|
||||
queries on events that exist in this JVM heap-based store. The in-memory buffer
|
||||
is heavily write optimized, and given that Druid is really designed for heavy
|
||||
concurrent reads, events do not remain in the in-memory buffer for very long.
|
||||
|
@ -567,7 +567,7 @@ queryable throughout the entire handoff process. Segments created by real-time
|
|||
processing are versioned by the start of the segment granularity interval.
|
||||
|
||||
\subsection{Batch Data Ingestion}
|
||||
The core component used by real-time ingestion is an index that can be
|
||||
The core component used by real-time ingestion is a hash map that can be
|
||||
incrementally populated and finalized to create an immutable segment. This core
|
||||
component is shared across both real-time and batch ingestion. Druid has built
|
||||
in support for creating segments by leveraging Hadoop and running MapReduce
|
||||
|
@ -643,41 +643,6 @@ cluster. If a segment is completely overshadowed by one or more segments, it
|
|||
will be flagged in this timeline. When the coordinator notices overshadowed
|
||||
segments, it tells historical nodes to drop these segments from the cluster.
|
||||
|
||||
\subsection{Queries}
|
||||
Druid has its own query language and accepts queries as POST requests. Broker,
|
||||
historical, and real-time nodes all share the same query API.
|
||||
|
||||
The body of the POST request is a JSON object containing key/value pairs
|
||||
specifying various query parameters. A typical query will contain the data
|
||||
source name, the granularity of the result data, time range of interest, the
|
||||
type of request, and the metrics to aggregate over. The result will also be a
|
||||
JSON object containing the aggregated metrics over the time period.
|
||||
|
||||
Most query types will also support a filter set. A filter set is a Boolean
|
||||
expression of dimension name and value pairs. Any number and combination of
|
||||
dimensions and values may be specified. When a filter set is provided, only the
|
||||
subset of the data that pertains to the filter set will be scanned. The ability
|
||||
to handle complex nested filter sets is what enables Druid to drill into data
|
||||
at any depth.
|
||||
|
||||
Druid supports many types of aggregations including sums on floating-point and
|
||||
integer types, minimums, maximums, and complex aggregations such as cardinality
|
||||
estimation and approximate quantile estimation. The results of aggregations can
|
||||
be combined in mathematical expressions to form other aggregations. It is
|
||||
beyond the scope of this paper to fully describe the query API but more
|
||||
information can be found
|
||||
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
|
||||
|
||||
As of this writing, a join query for Druid is not yet implemented. Although
|
||||
Druid’s storage format would allow for the implementation of joins (there is no
|
||||
loss of fidelity for columns included as dimensions), implementation time is
|
||||
costly and a strong enough use case has not yet arisen. When all sides of the
|
||||
join are significantly large tables (> 1 billion records), materializing the
|
||||
pre-join streams requires complex distributed memory management. The complexity
|
||||
of the memory management is only amplified by the fact that we are targeting
|
||||
highly concurrent, multi-tenant workloads. For these reasons, we’ve elected to
|
||||
do joins at the processing layer for the time being.
|
||||
|
||||
\section{The Processing Layer}
|
||||
\label{sec:processing}
|
||||
Although Druid can ingest events that are streamed in one at a time, data must
|
||||
|
@ -707,20 +672,87 @@ involve operations such as renaming data, inserting default values for nulls
|
|||
and empty strings, and filtering data. One pipeline may write to many data
|
||||
sources in Druid.
|
||||
|
||||
Given that Druid does not support joins in queries, we require supporting joins
|
||||
at the data processing level. Our approach to do streaming joins is to buffer
|
||||
events for a configurable period of time. If an event arrives in the system
|
||||
with a join key that exists in the buffer, the join occurs and the joined event
|
||||
is transmitted further down the pipeline. If events are substantially delayed
|
||||
and do not arrive in the allocated window period, they will not be joined. In
|
||||
practice, this generally leads to one “primary” event continuing through the
|
||||
pipeline and other secondary events with the same join key getting dropped.
|
||||
This means that our stream processing layer is not guaranteed to deliver 100\%
|
||||
accurate results. Furthermore, even without this restriction, Samza does not
|
||||
offer exactly-once processing semantics. Problems in network connectivity or
|
||||
node failure can lead to duplicated events. For these reasons, we run a
|
||||
separate batch pipeline that generates a more accurate transformation of the
|
||||
ingested data.
|
||||
To understand a real-world pipeline, let's consider an example from online
|
||||
advertising. In online advertising, events are generated by impressions (views)
|
||||
of an ad and clicks of an ad. Many advertisers are interested in knowing how
|
||||
many impressions of an ad converted into clicks. Impression streams and click
|
||||
streams are often recorded as separate streams by ad servers and need to be
|
||||
joined. An example of data generated by these two event streams is shown in
|
||||
Figure~\ref{fig:imps_clicks}. Every event has a unique id or key that
|
||||
identifies the ad served. We use this id as our join key.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.6in]{imps_clicks}
|
||||
\caption{
|
||||
Ad impressions
|
||||
and clicks are recorded in two separate streams. An event we want to join is
|
||||
located in two different Kafka partitions on two different topics.
|
||||
}
|
||||
\label{fig:imps_clicks}
|
||||
\end{figure}
|
||||
|
||||
Given that Druid does not support joins in queries, we need to do this join at
|
||||
the data processing level. Our approach to do streaming joins is to buffer
|
||||
events for a configurable period of time. We can leverage any key/value
|
||||
database as the buffer, although we prefer one that has relatively high read
|
||||
and write throughput. Events are typically buffered for 30 minutes. Once an
|
||||
event arrives in the system with a join key that exists in the buffer, we
|
||||
perform the first operation in our data pipeline: shuffling. A shuffle
|
||||
operation writes events from our impressions and clicks streams to Kafka such
|
||||
that the events that need to be joined are written to the same Kafka partition.
|
||||
This is shown in Figure~\ref{fig:shuffled}.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.6in]{shuffled}
|
||||
\caption{
|
||||
A shuffle operation ensures events to be joined at stored in the same Kafka
|
||||
partition.
|
||||
}
|
||||
\label{fig:shuffled}
|
||||
\end{figure}
|
||||
|
||||
The next stage in the data pipeline is to actually join the impression and
|
||||
click. This is done by creating a new field in the data, called "is\_clicked".
|
||||
This field is marked as "true" if a successful join occurs. This is shown in
|
||||
Figure~\ref{fig:joined}
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.6in]{joined}
|
||||
\caption{
|
||||
The join operation adds a new field, "is\_clicked".
|
||||
}
|
||||
\label{fig:joined}
|
||||
\end{figure}
|
||||
|
||||
The final stage of our data processing is to enhance the data. This stage
|
||||
cleans up faults in data, and performs lookups and transforms of events. Once
|
||||
data is cleaned, it is ready to be delivered to Druid for queries. The total
|
||||
streaming data processing pipeline is shown in Figure~\ref{fig:pipeline}.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.6in]{pipeline}
|
||||
\caption{
|
||||
The streaming processing data pipeline.
|
||||
}
|
||||
\label{fig:pipeline}
|
||||
\end{figure}
|
||||
|
||||
The system we have designed is not perfect. Because we are doing windowed joins
|
||||
and because events cannot be buffered indefinitely, not all joins are
|
||||
guaranteed to complete. If events are substantially delayed and do not arrive
|
||||
in the allocated window period, they will not be joined. In practice, this
|
||||
generally leads to one “primary” event continuing through the pipeline and
|
||||
other secondary events with the same join key getting dropped. This means that
|
||||
our stream processing layer is not guaranteed to deliver 100\% accurate
|
||||
results. Furthermore, even without this restriction, Samza does not offer
|
||||
exactly-once processing semantics. Problems in network connectivity or node
|
||||
failure can lead to duplicated events. For these reasons, we run a separate
|
||||
batch pipeline that generates a more accurate transformation of the ingested
|
||||
data.
|
||||
|
||||
The final job of our processing pipeline is to deliver data to Druid. For high
|
||||
availability, processed events from Samza are transmitted concurrently to two
|
||||
|
@ -830,38 +862,6 @@ To showcase the average query latencies in a production Druid cluster, we
|
|||
selected 8 of our most queried data sources, described in
|
||||
Table~\ref{tab:datasources}.
|
||||
|
||||
Approximately 30\% of queries are standard aggregates involving different types
|
||||
of metrics and filters, 60\% of queries are ordered group bys over one or more
|
||||
dimensions with aggregates, and 10\% of queries are search queries and metadata
|
||||
retrieval queries. The number of columns scanned in aggregate queries roughly
|
||||
follows an exponential distribution. Queries involving a single column are very
|
||||
frequent, and queries involving all columns are very rare.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{avg_query_latency}
|
||||
\includegraphics[width = 2.3in]{query_percentiles}
|
||||
\caption{Query latencies of production data sources.}
|
||||
\label{fig:query_latency}
|
||||
\end{figure}
|
||||
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]
|
||||
|
||||
\item There were
|
||||
approximately 50 total data sources in this particular cluster and several hundred users issuing
|
||||
queries.
|
||||
|
||||
\item There was approximately 10.5TB of RAM available in this cluster and
|
||||
approximately 10TB of segments loaded. Collectively,
|
||||
there are about 50 billion Druid rows in this cluster. Results for
|
||||
every data source is not shown.
|
||||
|
||||
\item This cluster uses Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 processors and consists of 1302 processing
|
||||
threads and 672 total cores (hyperthreaded).
|
||||
|
||||
\item A memory-mapped storage engine was used (the machine was configured to
|
||||
memory map the data instead of loading it into the Java heap.)
|
||||
\end{itemize}
|
||||
|
||||
\begin{table}
|
||||
\centering
|
||||
\scriptsize\begin{tabular}{| l | l | l |}
|
||||
|
@ -881,6 +881,30 @@ threads and 672 total cores (hyperthreaded).
|
|||
\label{tab:datasources}
|
||||
\end{table}
|
||||
|
||||
Approximately 30\% of queries are standard aggregates involving different types
|
||||
of metrics and filters, 60\% of queries are ordered group bys over one or more
|
||||
dimensions with aggregates, and 10\% of queries are search queries and metadata
|
||||
retrieval queries. The number of columns scanned in aggregate queries roughly
|
||||
follows an exponential distribution. Queries involving a single column are very
|
||||
frequent, and queries involving all columns are very rare.
|
||||
|
||||
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]
|
||||
\item There were
|
||||
approximately 50 total data sources in this particular cluster and several hundred users issuing
|
||||
queries.
|
||||
|
||||
\item There was approximately 10.5TB of RAM available in this cluster and
|
||||
approximately 10TB of segments loaded. Collectively,
|
||||
there are about 50 billion Druid rows in this cluster. Results for
|
||||
every data source is not shown.
|
||||
|
||||
\item This cluster uses Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 processors and consists of 1302 processing
|
||||
threads and 672 total cores (hyperthreaded).
|
||||
|
||||
\item A memory-mapped storage engine was used (the machine was configured to
|
||||
memory map the data instead of loading it into the Java heap.)
|
||||
\end{itemize}
|
||||
|
||||
Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
|
||||
minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
|
||||
data sources, average query latency is approximately 550 milliseconds, with
|
||||
|
@ -890,6 +914,14 @@ spikes in latency, as observed on February 19, where network issues on
|
|||
the broker nodes were compounded by very high query load on one of our
|
||||
largest data sources.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{avg_query_latency}
|
||||
\includegraphics[width = 2.3in]{query_percentiles}
|
||||
\caption{Query latencies of production data sources.}
|
||||
\label{fig:query_latency}
|
||||
\end{figure}
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.8in]{queries_per_min}
|
||||
|
@ -909,6 +941,14 @@ often limited by the time needed for the sequential operations of the system.
|
|||
In this case, queries requiring a substantial amount of work at the broker
|
||||
level do not parallelize as well.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{tpch_scaling}
|
||||
\includegraphics[width = 2.3in]{tpch_scaling_factor}
|
||||
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
|
||||
\label{fig:tpch_scaling}
|
||||
\end{figure}
|
||||
|
||||
Our Druid setup used Amazon EC2 \texttt{m3.2xlarge} instance types
|
||||
(Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered}
|
||||
E5-2680 v2 @ 2.80GHz) for historical nodes and \texttt{c3.2xlarge} instances
|
||||
|
@ -920,14 +960,6 @@ rows/second/core for \texttt{select count(*)} equivalent query over a given
|
|||
time interval and 36,246,530 rows/second/core for a \texttt{select sum(float)}
|
||||
type query.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
\includegraphics[width = 2.3in]{tpch_scaling}
|
||||
\includegraphics[width = 2.3in]{tpch_scaling_factor}
|
||||
\caption{Druid scaling benchmarks -- 100GB TPC-H data.}
|
||||
\label{fig:tpch_scaling}
|
||||
\end{figure}
|
||||
|
||||
\subsection{Data Ingestion Performance}
|
||||
\begin{table}
|
||||
\centering
|
||||
|
|
Loading…
Reference in New Issue