minor rewording and tighter formatting

This commit is contained in:
Xavier Léauté 2014-03-12 21:15:27 -07:00
parent 7d16960e23
commit bfe502a46a
1 changed files with 71 additions and 63 deletions

View File

@ -121,7 +121,6 @@ edit.
\begin{table*} \begin{table*}
\centering \centering
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
\label{tab:sample_data} \label{tab:sample_data}
\begin{tabular}{| l | l | l | l | l | l | l | l |} \begin{tabular}{| l | l | l | l | l | l | l | l |}
\hline \hline
@ -131,6 +130,7 @@ edit.
2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
\end{tabular} \end{tabular}
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
\end{table*} \end{table*}
Our goal is to rapidly compute drill-downs and aggregates over this data. We Our goal is to rapidly compute drill-downs and aggregates over this data. We
@ -537,17 +537,17 @@ method to compress data and has been used in other data stores such as
PowerDrill \cite{hall2012processing}. In the example in PowerDrill \cite{hall2012processing}. In the example in
Table~\ref{tab:sample_data}, we can map each page to an unique integer Table~\ref{tab:sample_data}, we can map each page to an unique integer
identifier. identifier.
\begin{verbatim} {\small\begin{verbatim}
Justin Bieber -> 0 Justin Bieber -> 0
Ke$ha -> 1 Ke$ha -> 1
\end{verbatim} \end{verbatim}}
This mapping allows us to represent the page column as an integer This mapping allows us to represent the page column as an integer
array where the array indices correspond to the rows of the original array where the array indices correspond to the rows of the original
data set. For the page column, we can represent the unique data set. For the page column, we can represent the unique
pages as follows: pages as follows:
\begin{verbatim} {\small\begin{verbatim}
[0, 0, 1, 1] [0, 0, 1, 1]
\end{verbatim} \end{verbatim}}
The resulting integer array lends itself very well to The resulting integer array lends itself very well to
compression methods. Generic compression algorithms on top of encodings are compression methods. Generic compression algorithms on top of encodings are
@ -558,10 +558,10 @@ Similar compression methods can be applied to numeric
columns. For example, the characters added and characters removed columns in columns. For example, the characters added and characters removed columns in
Table~\ref{tab:sample_data} can also be expressed as individual Table~\ref{tab:sample_data} can also be expressed as individual
arrays. arrays.
\begin{verbatim} {\small\begin{verbatim}
Characters Added -> [1800, 2912, 1953, 3194] Characters Added -> [1800, 2912, 1953, 3194]
Characters Removed -> [25, 42, 17, 170] Characters Removed -> [25, 42, 17, 170]
\end{verbatim} \end{verbatim}}
In this case, we compress the raw values as opposed to their dictionary In this case, we compress the raw values as opposed to their dictionary
representations. representations.
@ -583,18 +583,18 @@ indicating in which table rows a particular page is seen. We can
store this information in a binary array where the array indices store this information in a binary array where the array indices
represent our rows. If a particular page is seen in a certain represent our rows. If a particular page is seen in a certain
row, that array index is marked as \texttt{1}. For example: row, that array index is marked as \texttt{1}. For example:
\begin{verbatim} {\small\begin{verbatim}
Justin Bieber -> rows [0, 1] -> [1][1][0][0] Justin Bieber -> rows [0, 1] -> [1][1][0][0]
Ke$ha -> rows [2, 3] -> [0][0][1][1] Ke$ha -> rows [2, 3] -> [0][0][1][1]
\end{verbatim} \end{verbatim}}
\texttt{Justin Bieber} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values \texttt{Justin Bieber} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values
to row indices forms an inverted index \cite{tomasic1993performance}. To know which to row indices forms an inverted index \cite{tomasic1993performance}. To know which
rows contain {\ttfamily Justin Bieber} or {\ttfamily Ke\$ha}, we can \texttt{OR} together rows contain {\ttfamily Justin Bieber} or {\ttfamily Ke\$ha}, we can \texttt{OR} together
the two arrays. the two arrays.
\begin{verbatim} {\small\begin{verbatim}
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1] [0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
\end{verbatim} \end{verbatim}}
\begin{figure} \begin{figure}
\centering \centering
@ -664,7 +664,7 @@ into data at any depth.
The exact query syntax depends on the query type and the information requested. The exact query syntax depends on the query type and the information requested.
A sample count query over a week of data is as follows: A sample count query over a week of data is as follows:
\begin{verbatim} {\scriptsize\begin{verbatim}
{ {
"queryType" : "timeseries", "queryType" : "timeseries",
"dataSource" : "wikipedia", "dataSource" : "wikipedia",
@ -677,11 +677,11 @@ A sample count query over a week of data is as follows:
"granularity" : "day", "granularity" : "day",
"aggregations" : [{"type":"count", "name":"rows"}] "aggregations" : [{"type":"count", "name":"rows"}]
} }
\end{verbatim} \end{verbatim}}
The query shown above will return a count of the number of rows in the Wikipedia datasource The query shown above will return a count of the number of rows in the Wikipedia datasource
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
\begin{verbatim} {\scriptsize\begin{verbatim}
[ { [ {
"timestamp": "2012-01-01T00:00:00.000Z", "timestamp": "2012-01-01T00:00:00.000Z",
"result": {"rows":393298} "result": {"rows":393298}
@ -695,7 +695,7 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array
"timestamp": "2012-01-07T00:00:00.000Z", "timestamp": "2012-01-07T00:00:00.000Z",
"result": {"rows": 1337} "result": {"rows": 1337}
} ] } ]
\end{verbatim} \end{verbatim}}
Druid supports many types of aggregations including double sums, long sums, Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and complex aggregations such as cardinality estimation and minimums, maximums, and complex aggregations such as cardinality estimation and
approximate quantile estimation. The results of aggregations can be combined approximate quantile estimation. The results of aggregations can be combined
@ -712,11 +712,11 @@ requiring joins on your queries often limits the performance you can achieve.
\section{Performance} \section{Performance}
\label{sec:benchmarks} \label{sec:benchmarks}
Druid runs in production at several organizations, and to demonstrate its Druid runs in production at several organizations, and to demonstrate its
performance, we've chosen to share some real world numbers of the production performance, we have chosen to share some real world numbers for the main production
cluster at Metamarkets. The date range of the data is for Feburary 2014. cluster running at Metamarkets in early 2014.
\subsection{Query Performance} \subsection{Query Performance}
Druid query performance can vary signficantly depending on the actual query Druid query performance can vary signficantly depending on the query
being issued. For example, sorting the values of a high cardinality dimension being issued. For example, sorting the values of a high cardinality dimension
based on a given metric is much more expensive than a simple count over a time based on a given metric is much more expensive than a simple count over a time
range. To showcase the average query latencies in a production Druid cluster, range. To showcase the average query latencies in a production Druid cluster,
@ -731,7 +731,6 @@ involving all columns are very rare.
\begin{table} \begin{table}
\centering \centering
\caption{Dimensions and metrics of the 8 most queried Druid data sources in production.}
\label{tab:datasources} \label{tab:datasources}
\begin{tabular}{| l | l | l |} \begin{tabular}{| l | l | l |}
\hline \hline
@ -745,59 +744,64 @@ involving all columns are very rare.
\texttt{g} & 26 & 18 \\ \hline \texttt{g} & 26 & 18 \\ \hline
\texttt{h} & 78 & 14 \\ \hline \texttt{h} & 78 & 14 \\ \hline
\end{tabular} \end{tabular}
\caption{Characteristics of production data sources.}
\end{table} \end{table}
Some more details of our results: Some more details of our results:
\begin{itemize} \begin{itemize}
\item The results are from a "hot" tier in our production cluster. We run several tiers of varying performance in production. \item The results are from a "hot" tier in our production cluster. We run
\item There is approximately 10.5TB of RAM available in the "hot" tier and approximately 10TB of segments loaded (including replication). Collectively, there are about 50 billion Druid rows in this tier. Results for every data source are not shown. several tiers of varying performance in production.
\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing threads and 672 total cores (hyperthreaded).
\item A memory-mapped storage engine was used (the machine was configured to memory map the data \item There is approximately 10.5TB of RAM available in the "hot" tier and
instead of loading it into the Java heap.) approximately 10TB of segments loaded (including replication). Collectively,
there are about 50 billion Druid rows in this tier. Results for
every data source are not shown.
\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing
threads and 672 total cores (hyperthreaded).
\item A memory-mapped storage engine was used (the machine was configured to
memory map the data instead of loading it into the Java heap.)
\end{itemize} \end{itemize}
The average query latency is shown in Figure~\ref{fig:avg_query_latency} and The average query latency is shown in Figure~\ref{fig:query_latency} and
the queries per minute is shown in Figure~\ref{fig:queries_per_min}. We can see the queries per minute is shown in Figure~\ref{fig:queries_per_min}. We can see
that across the various data sources, the average query latency is approximately that across the various data sources, the average query latency is approximately
540ms. The 90th percentile query latency across these data sources is < 1s, the 540ms. The 90th percentile query latency across these data sources is < 1s, the
95th percentile is < 2s, and the 99th percentile is < 10s. The percentiles are 95th percentile is < 2s, and the 99th percentile is < 10s.
shown in Figure~\ref{fig:query_percentiles}.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{avg_query_latency} \includegraphics[width = 2.8in]{avg_query_latency}
\caption{Druid production cluster average query latencies for multiple data sources.} \includegraphics[width = 2.8in]{query_percentiles}
\label{fig:avg_query_latency} \caption{Query latencies of production data sources.}
\label{fig:query_latency}
\end{figure} \end{figure}
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{queries_per_min} \includegraphics[width = 2.8in]{queries_per_min}
\caption{Druid production cluster queries per minute for multiple data sources.} \caption{Queries per minute of production data sources.}
\label{fig:queries_per_min} \label{fig:queries_per_min}
\end{figure} \end{figure}
\begin{figure} We also present Druid benchmarks on TPC-H data. Our setup used Amazon EC2
\centering \texttt{m3.2xlarge} (CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
\includegraphics[width = 2.8in]{query_percentiles}
\caption{Druid production cluster 90th, 95th, and 99th query latency percentiles for the 8 most queried data sources.}
\label{fig:query_percentiles}
\end{figure}
We also present Druid benchmarks with TPC-H data. Our setup used Amazon EC2
m3.2xlarge (CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
historical nodes. Most TPC-H queries do not directly apply to Druid, so we historical nodes. Most TPC-H queries do not directly apply to Druid, so we
selected similiar queries to demonstrate Druid's query performance. As a selected queries more typical of Druid's workload to demonstrate query performance. As a
comparison, we also provide the results of the same queries using MySQL with comparison, we also provide the results of the same queries using MySQL using the
MyISAM (InnoDB was slower in our experiments). Our MySQL setup was an Amazon MyISAM engine (InnoDB was slower in our experiments). Our MySQL setup was an Amazon
RDS instance that also ran on an m3.2xlarge node.We selected MySQL to benchmark RDS instance that ran on the same instance type. We selected MySQL to benchmark
against because of its universal popularity. We choose not to select another against because of its universal popularity. We choose not to select another
open source column store because we were not confident we could correctly tune open source column store because we were not confident we could correctly tune
it for optimal performance. The results for the 1 GB TPC-H data set are shown it for optimal performance.
The results for the 1 GB TPC-H data set are shown
in Figure~\ref{fig:tpch_1gb} and the results of the 100 GB data set are shown in Figure~\ref{fig:tpch_1gb} and the results of the 100 GB data set are shown
in Figure~\ref{fig:tpch_100gb}. We benchmarked Druid's scan rate at in Figure~\ref{fig:tpch_100gb}. We benchmarked Druid's scan rate at
53,539,211.1 rows/second/core for count(*) over a given interval and 36,246,530 53,539,211 rows/second/core for \texttt{select count(*)} equivalent query over a given time interval
rows/second/core for an aggregation involving floats. and 36,246,530 rows/second/core for a \texttt{select sum(float)} type query.
\begin{figure} \begin{figure}
\centering \centering
@ -846,16 +850,16 @@ dimensions in each event, the number of metrics in each event, and the types of
aggregations we want to perform on those metrics. With the most basic data set aggregations we want to perform on those metrics. With the most basic data set
(one that only has a timestamp column), our setup can ingest data at a rate of (one that only has a timestamp column), our setup can ingest data at a rate of
800,000 events/sec/core, which is really just a measurement of how fast we can 800,000 events/sec/core, which is really just a measurement of how fast we can
deserialize events. Real world data sets are never this simple. A description deserialize events. Real world data sets are never this simple.
of the data sources we selected is shown in Table~\ref{tab:ingest_datasources}. Table~\ref{tab:ingest_datasources} shows a selection of data sources and their
chracteristics.
\begin{table} \begin{table}
\centering \centering
\caption{Dimensions, metrics, and peak throughputs of various ingested data sources.}
\label{tab:ingest_datasources} \label{tab:ingest_datasources}
\begin{tabular}{| l | l | l | l |} \begin{tabular}{| l | l | l | l |}
\hline \hline
\textbf{Data Source} & \textbf{Dims} & \textbf{Mets} & \textbf{Peak Throughput (events/sec)} \\ \hline \scriptsize\textbf{Data Source} & \scriptsize\textbf{Dimensions} & \scriptsize\textbf{Metrics} & \scriptsize\textbf{Peak Throughput (events/sec)} \\ \hline
\texttt{s} & 7 & 2 & 28334.60 \\ \hline \texttt{s} & 7 & 2 & 28334.60 \\ \hline
\texttt{t} & 10 & 7 & 68808.70 \\ \hline \texttt{t} & 10 & 7 & 68808.70 \\ \hline
\texttt{u} & 5 & 1 & 49933.93 \\ \hline \texttt{u} & 5 & 1 & 49933.93 \\ \hline
@ -865,6 +869,7 @@ of the data sources we selected is shown in Table~\ref{tab:ingest_datasources}.
\texttt{y} & 33 & 24 & 162462.41 \\ \hline \texttt{y} & 33 & 24 & 162462.41 \\ \hline
\texttt{z} & 33 & 24 & 95747.74 \\ \hline \texttt{z} & 33 & 24 & 95747.74 \\ \hline
\end{tabular} \end{tabular}
\caption{Ingestion characteristics of various data sources.}
\end{table} \end{table}
We can see that based on the descriptions in We can see that based on the descriptions in
@ -876,13 +881,13 @@ Figure~\ref{fig:ingestion_rate}. We define throughput as the number of events a
real-time node can ingest and also make queryable. If too many events are sent real-time node can ingest and also make queryable. If too many events are sent
to the real-time node, those events are blocked until the real-time node has to the real-time node, those events are blocked until the real-time node has
capacity to accept them. The peak ingestion latency we measured in production capacity to accept them. The peak ingestion latency we measured in production
was 22914.43 events/sec/core on an Amazon EC2 cc2.8xlarge. The data source had was 22914.43 events/sec/core on a datasource with 30 dimensions and 19 metrics,
30 dimensions and 19 metrics. running an Amazon \texttt{cc2.8xlarge} instance.
\begin{figure} \begin{figure}
\centering \centering
\includegraphics[width = 2.8in]{ingestion_rate} \includegraphics[width = 2.8in]{ingestion_rate}
\caption{Druid production cluster ingestion rates for multiple data sources.} \caption{Combined cluster ingestion rates.}
\label{fig:ingestion_rate} \label{fig:ingestion_rate}
\end{figure} \end{figure}
@ -896,8 +901,9 @@ cost is still a consideration to us.
\label{sec:production} \label{sec:production}
Over the last few years, we've gained tremendous knowledge about handling Over the last few years, we've gained tremendous knowledge about handling
production workloads with Druid. Some of our more interesting observations include: production workloads with Druid. Some of our more interesting observations include:
\begin{itemize}
\item Druid is often used to explore data and generate reports on data. In the \paragraph{Query Patterns}
Druid is often used to explore data and generate reports on data. In the
explore use case, the number of queries issued by a single user is much higher explore use case, the number of queries issued by a single user is much higher
than in the reporting use case. Exploratory queries often involve progressively than in the reporting use case. Exploratory queries often involve progressively
adding filters for the same time range to narrow down results. Users tend to adding filters for the same time range to narrow down results. Users tend to
@ -905,7 +911,8 @@ explore short time intervals of recent data. In the reporting use case, users
query for a much larger data interval, but already have a set of queries in query for a much larger data interval, but already have a set of queries in
mind. mind.
\item Expensive concurrent queries can be problematic in a multitenant \paragraph{Multitenant Workload}
Expensive concurrent queries can be problematic in a multitenant
environment. Queries for large datasources may end up hitting every historical environment. Queries for large datasources may end up hitting every historical
node in a cluster and consume all cluster resources. Smaller, cheaper queries node in a cluster and consume all cluster resources. Smaller, cheaper queries
may be blocked from executing in such cases. We introduced query prioritization may be blocked from executing in such cases. We introduced query prioritization
@ -915,7 +922,8 @@ workloads. Thankfully, queries for a significant amount of data tend to be for
reporting use cases, and users are not expecting the same level of reporting use cases, and users are not expecting the same level of
interactivity as when they are querying to explore data. interactivity as when they are querying to explore data.
\item Node failures are common in a distributed environment, but many nodes at \paragraph{Node failures}
Node failures are common in a distributed environment, but many nodes at
once failing are not. If historical nodes fail and do not recover, their once failing are not. If historical nodes fail and do not recover, their
segments need to reassigned, which means we need excess cluster capacity to segments need to reassigned, which means we need excess cluster capacity to
load this data. The amount of additional capacity to have at any time is a load this data. The amount of additional capacity to have at any time is a
@ -923,14 +931,14 @@ factor of cost. It is extremely rare to see more than 2 nodes fail at once and
never recover and hence, we leave enough capacity to completely reassign the never recover and hence, we leave enough capacity to completely reassign the
data from 2 historical nodes. data from 2 historical nodes.
\item Complete cluster failures are possible, but extremely rare. When running \paragraph{Outages}
Complete cluster failures are possible, but extremely rare. When running
in a single data center, it is possible for the entire data center to fail. In in a single data center, it is possible for the entire data center to fail. In
such a case, a new cluster needs to be created. As long as deep storage is such a case, a new cluster needs to be created. As long as deep storage is
available, cluster recovery time is network bound. Historical nodes need to available, cluster recovery time is network bound. Historical nodes need to
reload every segment from deep storage. We have experienced such a failure in reload every segment from deep storage. We have experienced such a failure in
the past, and it took several hours for our entire Druid cluster to recover on the past, and it took several hours for our entire Druid cluster to recover on
several TBs of data. several TBs of data.
\end{itemize}
\subsection{Operational Monitoring} \subsection{Operational Monitoring}
Proper monitoring is critical to run a large scale distributed cluster. Proper monitoring is critical to run a large scale distributed cluster.