From bfe502a46acd0cc49e3aff440cbc39bdb0812232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 12 Mar 2014 21:15:27 -0700 Subject: [PATCH] minor rewording and tighter formatting --- publications/whitepaper/druid.tex | 134 ++++++++++++++++-------------- 1 file changed, 71 insertions(+), 63 deletions(-) diff --git a/publications/whitepaper/druid.tex b/publications/whitepaper/druid.tex index 20f59e56eea..419dffb2583 100644 --- a/publications/whitepaper/druid.tex +++ b/publications/whitepaper/druid.tex @@ -121,7 +121,6 @@ edit. \begin{table*} \centering - \caption{Sample Druid data for edits that have occurred on Wikipedia.} \label{tab:sample_data} \begin{tabular}{| l | l | l | l | l | l | l | l |} \hline @@ -131,6 +130,7 @@ edit. 2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline \end{tabular} + \caption{Sample Druid data for edits that have occurred on Wikipedia.} \end{table*} Our goal is to rapidly compute drill-downs and aggregates over this data. We @@ -537,17 +537,17 @@ method to compress data and has been used in other data stores such as PowerDrill \cite{hall2012processing}. In the example in Table~\ref{tab:sample_data}, we can map each page to an unique integer identifier. -\begin{verbatim} +{\small\begin{verbatim} Justin Bieber -> 0 Ke$ha -> 1 -\end{verbatim} +\end{verbatim}} This mapping allows us to represent the page column as an integer array where the array indices correspond to the rows of the original data set. For the page column, we can represent the unique pages as follows: -\begin{verbatim} +{\small\begin{verbatim} [0, 0, 1, 1] -\end{verbatim} +\end{verbatim}} The resulting integer array lends itself very well to compression methods. Generic compression algorithms on top of encodings are @@ -558,10 +558,10 @@ Similar compression methods can be applied to numeric columns. For example, the characters added and characters removed columns in Table~\ref{tab:sample_data} can also be expressed as individual arrays. -\begin{verbatim} -Characters Added -> [1800, 2912, 1953, 3194] +{\small\begin{verbatim} +Characters Added -> [1800, 2912, 1953, 3194] Characters Removed -> [25, 42, 17, 170] -\end{verbatim} +\end{verbatim}} In this case, we compress the raw values as opposed to their dictionary representations. @@ -583,18 +583,18 @@ indicating in which table rows a particular page is seen. We can store this information in a binary array where the array indices represent our rows. If a particular page is seen in a certain row, that array index is marked as \texttt{1}. For example: -\begin{verbatim} -Justin Bieber -> rows [0, 1] -> [1][1][0][0] -Ke$ha -> rows [2, 3] -> [0][0][1][1] -\end{verbatim} +{\small\begin{verbatim} +Justin Bieber -> rows [0, 1] -> [1][1][0][0] +Ke$ha -> rows [2, 3] -> [0][0][1][1] +\end{verbatim}} \texttt{Justin Bieber} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values to row indices forms an inverted index \cite{tomasic1993performance}. To know which rows contain {\ttfamily Justin Bieber} or {\ttfamily Ke\$ha}, we can \texttt{OR} together the two arrays. -\begin{verbatim} +{\small\begin{verbatim} [0][1][0][1] OR [1][0][1][0] = [1][1][1][1] -\end{verbatim} +\end{verbatim}} \begin{figure} \centering @@ -664,7 +664,7 @@ into data at any depth. The exact query syntax depends on the query type and the information requested. A sample count query over a week of data is as follows: -\begin{verbatim} +{\scriptsize\begin{verbatim} { "queryType" : "timeseries", "dataSource" : "wikipedia", @@ -677,11 +677,11 @@ A sample count query over a week of data is as follows: "granularity" : "day", "aggregations" : [{"type":"count", "name":"rows"}] } -\end{verbatim} +\end{verbatim}} The query shown above will return a count of the number of rows in the Wikipedia datasource from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: -\begin{verbatim} +{\scriptsize\begin{verbatim} [ { "timestamp": "2012-01-01T00:00:00.000Z", "result": {"rows":393298} @@ -695,7 +695,7 @@ equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array "timestamp": "2012-01-07T00:00:00.000Z", "result": {"rows": 1337} } ] -\end{verbatim} +\end{verbatim}} Druid supports many types of aggregations including double sums, long sums, minimums, maximums, and complex aggregations such as cardinality estimation and approximate quantile estimation. The results of aggregations can be combined @@ -712,11 +712,11 @@ requiring joins on your queries often limits the performance you can achieve. \section{Performance} \label{sec:benchmarks} Druid runs in production at several organizations, and to demonstrate its -performance, we've chosen to share some real world numbers of the production -cluster at Metamarkets. The date range of the data is for Feburary 2014. +performance, we have chosen to share some real world numbers for the main production +cluster running at Metamarkets in early 2014. \subsection{Query Performance} -Druid query performance can vary signficantly depending on the actual query +Druid query performance can vary signficantly depending on the query being issued. For example, sorting the values of a high cardinality dimension based on a given metric is much more expensive than a simple count over a time range. To showcase the average query latencies in a production Druid cluster, @@ -731,7 +731,6 @@ involving all columns are very rare. \begin{table} \centering - \caption{Dimensions and metrics of the 8 most queried Druid data sources in production.} \label{tab:datasources} \begin{tabular}{| l | l | l |} \hline @@ -745,59 +744,64 @@ involving all columns are very rare. \texttt{g} & 26 & 18 \\ \hline \texttt{h} & 78 & 14 \\ \hline \end{tabular} + \caption{Characteristics of production data sources.} \end{table} Some more details of our results: \begin{itemize} -\item The results are from a "hot" tier in our production cluster. We run several tiers of varying performance in production. -\item There is approximately 10.5TB of RAM available in the "hot" tier and approximately 10TB of segments loaded (including replication). Collectively, there are about 50 billion Druid rows in this tier. Results for every data source are not shown. -\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing threads and 672 total cores (hyperthreaded). -\item A memory-mapped storage engine was used (the machine was configured to memory map the data - instead of loading it into the Java heap.) +\item The results are from a "hot" tier in our production cluster. We run +several tiers of varying performance in production. + +\item There is approximately 10.5TB of RAM available in the "hot" tier and +approximately 10TB of segments loaded (including replication). Collectively, +there are about 50 billion Druid rows in this tier. Results for +every data source are not shown. + +\item The hot tier uses Xeon E5-2670 processors and consists of 1302 processing +threads and 672 total cores (hyperthreaded). + +\item A memory-mapped storage engine was used (the machine was configured to + memory map the data instead of loading it into the Java heap.) \end{itemize} -The average query latency is shown in Figure~\ref{fig:avg_query_latency} and +The average query latency is shown in Figure~\ref{fig:query_latency} and the queries per minute is shown in Figure~\ref{fig:queries_per_min}. We can see that across the various data sources, the average query latency is approximately 540ms. The 90th percentile query latency across these data sources is < 1s, the -95th percentile is < 2s, and the 99th percentile is < 10s. The percentiles are -shown in Figure~\ref{fig:query_percentiles}. +95th percentile is < 2s, and the 99th percentile is < 10s. + \begin{figure} \centering \includegraphics[width = 2.8in]{avg_query_latency} -\caption{Druid production cluster average query latencies for multiple data sources.} -\label{fig:avg_query_latency} +\includegraphics[width = 2.8in]{query_percentiles} +\caption{Query latencies of production data sources.} +\label{fig:query_latency} \end{figure} \begin{figure} \centering \includegraphics[width = 2.8in]{queries_per_min} -\caption{Druid production cluster queries per minute for multiple data sources.} +\caption{Queries per minute of production data sources.} \label{fig:queries_per_min} \end{figure} -\begin{figure} -\centering -\includegraphics[width = 2.8in]{query_percentiles} -\caption{Druid production cluster 90th, 95th, and 99th query latency percentiles for the 8 most queried data sources.} -\label{fig:query_percentiles} -\end{figure} - -We also present Druid benchmarks with TPC-H data. Our setup used Amazon EC2 -m3.2xlarge (CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for +We also present Druid benchmarks on TPC-H data. Our setup used Amazon EC2 +\texttt{m3.2xlarge} (CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for historical nodes. Most TPC-H queries do not directly apply to Druid, so we -selected similiar queries to demonstrate Druid's query performance. As a -comparison, we also provide the results of the same queries using MySQL with -MyISAM (InnoDB was slower in our experiments). Our MySQL setup was an Amazon -RDS instance that also ran on an m3.2xlarge node.We selected MySQL to benchmark +selected queries more typical of Druid's workload to demonstrate query performance. As a +comparison, we also provide the results of the same queries using MySQL using the +MyISAM engine (InnoDB was slower in our experiments). Our MySQL setup was an Amazon +RDS instance that ran on the same instance type. We selected MySQL to benchmark against because of its universal popularity. We choose not to select another open source column store because we were not confident we could correctly tune -it for optimal performance. The results for the 1 GB TPC-H data set are shown +it for optimal performance. + +The results for the 1 GB TPC-H data set are shown in Figure~\ref{fig:tpch_1gb} and the results of the 100 GB data set are shown in Figure~\ref{fig:tpch_100gb}. We benchmarked Druid's scan rate at -53,539,211.1 rows/second/core for count(*) over a given interval and 36,246,530 -rows/second/core for an aggregation involving floats. +53,539,211 rows/second/core for \texttt{select count(*)} equivalent query over a given time interval +and 36,246,530 rows/second/core for a \texttt{select sum(float)} type query. \begin{figure} \centering @@ -846,16 +850,16 @@ dimensions in each event, the number of metrics in each event, and the types of aggregations we want to perform on those metrics. With the most basic data set (one that only has a timestamp column), our setup can ingest data at a rate of 800,000 events/sec/core, which is really just a measurement of how fast we can -deserialize events. Real world data sets are never this simple. A description -of the data sources we selected is shown in Table~\ref{tab:ingest_datasources}. +deserialize events. Real world data sets are never this simple. +Table~\ref{tab:ingest_datasources} shows a selection of data sources and their +chracteristics. \begin{table} \centering - \caption{Dimensions, metrics, and peak throughputs of various ingested data sources.} \label{tab:ingest_datasources} \begin{tabular}{| l | l | l | l |} \hline - \textbf{Data Source} & \textbf{Dims} & \textbf{Mets} & \textbf{Peak Throughput (events/sec)} \\ \hline + \scriptsize\textbf{Data Source} & \scriptsize\textbf{Dimensions} & \scriptsize\textbf{Metrics} & \scriptsize\textbf{Peak Throughput (events/sec)} \\ \hline \texttt{s} & 7 & 2 & 28334.60 \\ \hline \texttt{t} & 10 & 7 & 68808.70 \\ \hline \texttt{u} & 5 & 1 & 49933.93 \\ \hline @@ -865,6 +869,7 @@ of the data sources we selected is shown in Table~\ref{tab:ingest_datasources}. \texttt{y} & 33 & 24 & 162462.41 \\ \hline \texttt{z} & 33 & 24 & 95747.74 \\ \hline \end{tabular} + \caption{Ingestion characteristics of various data sources.} \end{table} We can see that based on the descriptions in @@ -875,14 +880,14 @@ rate that the data producer was delivering data. The results are shown in Figure~\ref{fig:ingestion_rate}. We define throughput as the number of events a real-time node can ingest and also make queryable. If too many events are sent to the real-time node, those events are blocked until the real-time node has -capacity to accept them.The peak ingestion latency we measured in production -was 22914.43 events/sec/core on an Amazon EC2 cc2.8xlarge. The data source had -30 dimensions and 19 metrics. +capacity to accept them. The peak ingestion latency we measured in production +was 22914.43 events/sec/core on a datasource with 30 dimensions and 19 metrics, +running an Amazon \texttt{cc2.8xlarge} instance. \begin{figure} \centering \includegraphics[width = 2.8in]{ingestion_rate} -\caption{Druid production cluster ingestion rates for multiple data sources.} +\caption{Combined cluster ingestion rates.} \label{fig:ingestion_rate} \end{figure} @@ -896,8 +901,9 @@ cost is still a consideration to us. \label{sec:production} Over the last few years, we've gained tremendous knowledge about handling production workloads with Druid. Some of our more interesting observations include: -\begin{itemize} -\item Druid is often used to explore data and generate reports on data. In the + +\paragraph{Query Patterns} +Druid is often used to explore data and generate reports on data. In the explore use case, the number of queries issued by a single user is much higher than in the reporting use case. Exploratory queries often involve progressively adding filters for the same time range to narrow down results. Users tend to @@ -905,7 +911,8 @@ explore short time intervals of recent data. In the reporting use case, users query for a much larger data interval, but already have a set of queries in mind. -\item Expensive concurrent queries can be problematic in a multitenant +\paragraph{Multitenant Workload} +Expensive concurrent queries can be problematic in a multitenant environment. Queries for large datasources may end up hitting every historical node in a cluster and consume all cluster resources. Smaller, cheaper queries may be blocked from executing in such cases. We introduced query prioritization @@ -915,7 +922,8 @@ workloads. Thankfully, queries for a significant amount of data tend to be for reporting use cases, and users are not expecting the same level of interactivity as when they are querying to explore data. -\item Node failures are common in a distributed environment, but many nodes at +\paragraph{Node failures} +Node failures are common in a distributed environment, but many nodes at once failing are not. If historical nodes fail and do not recover, their segments need to reassigned, which means we need excess cluster capacity to load this data. The amount of additional capacity to have at any time is a @@ -923,14 +931,14 @@ factor of cost. It is extremely rare to see more than 2 nodes fail at once and never recover and hence, we leave enough capacity to completely reassign the data from 2 historical nodes. -\item Complete cluster failures are possible, but extremely rare. When running +\paragraph{Outages} +Complete cluster failures are possible, but extremely rare. When running in a single data center, it is possible for the entire data center to fail. In such a case, a new cluster needs to be created. As long as deep storage is available, cluster recovery time is network bound. Historical nodes need to reload every segment from deep storage. We have experienced such a failure in the past, and it took several hours for our entire Druid cluster to recover on several TBs of data. -\end{itemize} \subsection{Operational Monitoring} Proper monitoring is critical to run a large scale distributed cluster.