edits to paper

This commit is contained in:
fjy 2014-02-23 11:36:39 -08:00
parent 699bae8e1b
commit 04ee2505e6
3 changed files with 89 additions and 89 deletions

View File

@ -141,7 +141,7 @@
title = {Introducing Druid: Real-Time Analytics at a Billion Rows Per Second},
month = {April},
year = {2011},
howpublished = "\url{http://metamarkets.com/2011/druid-part-i-real-time-analytics-at-a-billion-rows-per-second/}"
howpublished = "\url{http://druid.io/blog/2011/04/30/introducing-druid.html}"
}
@article{farber2012sap,

Binary file not shown.

View File

@ -96,9 +96,10 @@ Section \ref{sec:problem-definition}. Next, we detail system architecture from
the point of view of how data flows through the system in Section
\ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the
query API in Section \ref{sec:query-api}. Lastly, we leave off with some
benchmarks in Section \ref{sec:benchmarks}, related work in Section
\ref{sec:related} and conclusions are Section \ref{sec:conclusions}.
query API in Section \ref{sec:query-api} and present our experimental results
in Section \ref{sec:benchmarks}. Lastly, we leave off with our learnings from
running Druid in production in Section \ref{sec:production}, related work
in Section \ref{sec:related}, and conclusions in Section \ref{sec:conclusions}.
\section{Problem Definition}
\label{sec:problem-definition}
@ -139,13 +140,14 @@ want queries over any arbitrary combination of dimensions to return with
sub-second latencies.
The need for Druid was facilitated by the fact that existing open source
Relational Database Management Systems and NoSQL key/value stores were unable
to provide a low latency data ingestion and query platform for interactive
applications \cite{tschetter2011druid}. In the early days of Metamarkets, we
were focused on building a hosted dashboard that would allow users to arbitrary
explore and visualize event streams. The data store powering the dashboard
needed to return queries fast enough that the data visualizations built on top
of it could provide users with an interactive experience.
Relational Database Management Systems (RDBMS) and NoSQL key/value stores were
unable to provide a low latency data ingestion and query platform for
interactive applications \cite{tschetter2011druid}. In the early days of
Metamarkets, we were focused on building a hosted dashboard that would allow
users to arbitrary explore and visualize event streams. The data store
powering the dashboard needed to return queries fast enough that the data
visualizations built on top of it could provide users with an interactive
experience.
In addition to the query latency needs, the system had to be multi-tenant and
highly available. The Metamarkets product is used in a highly concurrent
@ -188,6 +190,7 @@ Figure~\ref{fig:cluster}.
\label{fig:cluster}
\end{figure*}
\newpage
\subsection{Real-time Nodes}
\label{sec:realtime}
Real-time nodes encapsulate the functionality to ingest and query event
@ -670,7 +673,8 @@ ability to handle complex nested filter sets is what enables Druid to drill
into data at any depth.
The exact query syntax depends on the query type and the information requested.
A sample count query over a week of data is shown below:
A sample count query over a week of data is as follows:
\newpage
\begin{verbatim}
{
"queryType" : "timeseries",
@ -688,7 +692,6 @@ A sample count query over a week of data is shown below:
} ]
}
\end{verbatim}
The query shown above will return a count of the number of rows in the Wikipedia datasource
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
@ -713,7 +716,6 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array
}
} ]
\end{verbatim}
Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and several others. Druid also supports complex aggregations
such as cardinality estimation and approximate quantile estimation. The
@ -723,9 +725,15 @@ filter and group results based on almost any arbitrary condition. It is beyond
the scope of this paper to fully describe the query API but more information
can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
We are also in the process of extending the Druid API to understand SQL.
At the time of writing, the query language does not support joins. Although the
storage format is able to support joins, we've targeted Druid at user-facing
workloads that must return in a matter of seconds, and as such, we've chosen to
not spend the time to implement joins as it has been our experience that
requiring joins on your queries often limits the performance you can achieve.
Implemting joins and extending the Druid API to understand SQL is something
we'd like to do in future work.
\section{Performance Benchmarks}
\section{Experimental Results}
\label{sec:benchmarks}
To illustrate Druid's performance, we conducted a series of experiments that
focused on measuring Druid's query and data ingestion capabilities.
@ -768,11 +776,15 @@ Please note:
1 & \texttt{SELECT count(*) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
2 & \texttt{SELECT count(*), sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
3 & \texttt{SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline
4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
5 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
6 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline
\end{tabular}
\end{table*}
4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_
WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER
BY cnt limit 100} \\ \hline 5 & \texttt{SELECT high\_card\_dimension, count(*)
AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?
GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline 6 &
\texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1),
sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$
? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\
\hline \end{tabular} \end{table*}
Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and
Figure~\ref{fig:core_scan_rate} shows the core scan rate. In
@ -787,20 +799,12 @@ remain nearly constant. The increase in speed of a parallel computing system
is often limited by the time needed for the sequential operations of the
system, in accordance with Amdahl's law \cite{amdahl1967validity}.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling
from 25 nodes.}
\label{fig:cluster_scan_rate}
\end{figure}
\begin{figure} \centering \includegraphics[width = 2.8in]{cluster_scan_rate}
\caption{Druid cluster scan rate with lines indicating linear scaling from 25
nodes.} \label{fig:cluster_scan_rate} \end{figure}
\begin{figure}
\centering
\includegraphics[width = 2.8in]{core_scan_rate}
\caption{Druid core scan rate.}
\label{fig:core_scan_rate}
\end{figure}
\begin{figure} \centering \includegraphics[width = 2.8in]{core_scan_rate}
\caption{Druid core scan rate.} \label{fig:core_scan_rate} \end{figure}
The first query listed in Table~\ref{tab:sql_queries} is a simple
count, achieving scan rates of 33M rows/second/core. We believe
@ -878,56 +882,8 @@ than the number of dimensions.}
\label{fig:throughput_vs_num_metrics}
\end{figure}
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing
Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another
great summary for streaming databases. Druid feature-wise sits somewhere
between Googles Dremel \cite{melnik2010dremel} and PowerDrill
\cite{hall2012processing}. Druid has most of the features implemented in Dremel
(Dremel handles arbitrary nested data structures while Druid only allows for a
single level of array-based nesting) and many of the interesting compression
algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
SAPs HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
\cite{zaharia2012discretized}, however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
before the data enters Druid.
There are a class of systems that specialize in queries on top of cluster
computing frameworks. Shark \cite{engle2012shark} is such a system for queries
on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system
focused on optimizing query performance on top of HDFS. Druid historical nodes
download data locally and only work with native Druid indexes. We believe this
setup allows for faster query latencies.
Druid leverages a unique combination of algorithms in its
architecture. Although we believe no other data store has the same set
of functionality as Druid, some of Druids optimization techniques such as using
inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}.
\section{Druid in Production}
\label{sec:production}
Over the last few years of using Druid, we've gained tremendous
knowledge about handling production workloads, setting up correct operational
monitoring, integrating Druid with other products as part of a more
@ -1007,7 +963,56 @@ to have nodes in one data center act as a primary cluster (and recieve all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
\section{Conclusions and Future Work}
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing
Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another
great summary for streaming databases. Druid feature-wise sits somewhere
between Googles Dremel \cite{melnik2010dremel} and PowerDrill
\cite{hall2012processing}. Druid has most of the features implemented in Dremel
(Dremel handles arbitrary nested data structures while Druid only allows for a
single level of array-based nesting) and many of the interesting compression
algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
SAPs HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
\cite{zaharia2012discretized}, however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
before the data enters Druid.
There are a class of systems that specialize in queries on top of cluster
computing frameworks. Shark \cite{engle2012shark} is such a system for queries
on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system
focused on optimizing query performance on top of HDFS. Druid historical nodes
download data locally and only work with native Druid indexes. We believe this
setup allows for faster query latencies.
Druid leverages a unique combination of algorithms in its
architecture. Although we believe no other data store has the same set
of functionality as Druid, some of Druids optimization techniques such as using
inverted indices to perform fast filters are also used in other data
stores \cite{macnicol2004sybase}.
\section{Conclusions}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
@ -1016,11 +1021,6 @@ ingestion and is fault-tolerant. We discussed how Druid benchmarks and
summarized key architecture aspects such
as the storage format, query language, and general execution.
In the future, we plan to extend the Druid query language to support full SQL.
Doing so will require joins, a feature we've held off on implementing because
we do our joins at the data processing layer. We are also interested in
exploring more flexible data ingestion and support for less structured data.
\balance
\section{Acknowledgements}