finishing the paper

This commit is contained in:
fjy 2014-03-13 21:38:09 -07:00
parent b4f1591260
commit 5fd506cd51
2 changed files with 20 additions and 19 deletions

Binary file not shown.

View File

@ -76,7 +76,7 @@ came to the conclusion that there was nothing in the open source world that
could be fully leveraged for our requirements. could be fully leveraged for our requirements.
We ended up creating Druid, an open-source, distributed, column-oriented, We ended up creating Druid, an open-source, distributed, column-oriented,
realtime analytical data store. In many ways, Druid shares similarities with real-time analytical data store. In many ways, Druid shares similarities with
other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied}, other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
interactive query systems \cite{melnik2010dremel}, main-memory databases interactive query systems \cite{melnik2010dremel}, main-memory databases
\cite{farber2012sap}, and widely-known distributed data stores \cite{farber2012sap}, and widely-known distributed data stores
@ -413,7 +413,7 @@ distribution on historical nodes. The coordinator nodes tell historical nodes
to load new data, drop outdated data, replicate data, and move data to load to load new data, drop outdated data, replicate data, and move data to load
balance. Druid uses a multi-version concurrency control swapping protocol for balance. Druid uses a multi-version concurrency control swapping protocol for
managing immutable segments in order to maintain stable views. If any managing immutable segments in order to maintain stable views. If any
immutable segment contains data that is wholly obseleted by newer segments, the immutable segment contains data that is wholly obsoleted by newer segments, the
outdated segment is dropped from the cluster. Coordinator nodes undergo a outdated segment is dropped from the cluster. Coordinator nodes undergo a
leader-election process that determines a single node that runs the coordinator leader-election process that determines a single node that runs the coordinator
functionality. The remaining coordinator nodes act as redundant backups. functionality. The remaining coordinator nodes act as redundant backups.
@ -678,7 +678,7 @@ A sample count query over a week of data is as follows:
"aggregations" : [{"type":"count", "name":"rows"}] "aggregations" : [{"type":"count", "name":"rows"}]
} }
\end{verbatim}} \end{verbatim}}
The query shown above will return a count of the number of rows in the Wikipedia datasource The query shown above will return a count of the number of rows in the Wikipedia data source
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is
equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
{\scriptsize\begin{verbatim} {\scriptsize\begin{verbatim}
@ -780,11 +780,12 @@ involving all columns are very rare.
A few notes about our results: A few notes about our results:
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt] \begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]
\item The results are from a ``hot" tier in our production cluster. We run \item The results are from a ``hot" tier in our production cluster. There were
several tiers of varying performance in production. approximately 50 data sources in the tier and several hundred users issuing
queries.
\item There is approximately 10.5TB of RAM available in the ``hot" tier and \item There was approximately 10.5TB of RAM available in the ``hot" tier and
approximately 10TB of segments loaded (including replication). Collectively, approximately 10TB of segments loaded. Collectively,
there are about 50 billion Druid rows in this tier. Results for there are about 50 billion Druid rows in this tier. Results for
every data source are not shown. every data source are not shown.
@ -796,13 +797,13 @@ threads and 672 total cores (hyperthreaded).
\end{itemize} \end{itemize}
Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
data sources, average query latency is approximately 550 milliseconds, with data sources, average query latency is approximately 550 milliseconds, with
90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and 90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and
99\% of queries returning in less than 10 seconds. 99\% of queries returning in less than 10 seconds. Occasionally we observe
Occasionally we observe spikes in latency, as observed on February 19, spikes in latency, as observed on February 19, in which case network issues on
in which case network issues on the Memcached instances were compounded by very high the Memcached instances were compounded by very high query load on one of our
query load on one of our largest datasources. largest datasources.
\begin{figure} \begin{figure}
\centering \centering
@ -881,7 +882,7 @@ ingestion setup consists of 6 nodes, totalling 360GB of RAM and 96 cores
(12 x Intel Xeon E5-2670). (12 x Intel Xeon E5-2670).
Note that in this setup, several other data sources were being ingested and Note that in this setup, several other data sources were being ingested and
many other Druid related ingestion tasks were running concurrently on those machines. many other Druid related ingestion tasks were running concurrently on the machines.
Druid's data ingestion latency is heavily dependent on the complexity of the Druid's data ingestion latency is heavily dependent on the complexity of the
data set being ingested. The data complexity is determined by the number of data set being ingested. The data complexity is determined by the number of
@ -948,19 +949,19 @@ explore use case, the number of queries issued by a single user is much higher
than in the reporting use case. Exploratory queries often involve progressively than in the reporting use case. Exploratory queries often involve progressively
adding filters for the same time range to narrow down results. Users tend to adding filters for the same time range to narrow down results. Users tend to
explore short time intervals of recent data. In the generate report use case, explore short time intervals of recent data. In the generate report use case,
users query for much longer data intervals, but users also already have the users query for much longer data intervals, but users also already know the
queries they want to issue in mind. queries they want to issue.
\paragraph{Multitenancy} \paragraph{Multitenancy}
Expensive concurrent queries can be problematic in a multitenant Expensive concurrent queries can be problematic in a multitenant
environment. Queries for large datasources may end up hitting every historical environment. Queries for large data sources may end up hitting every historical
node in a cluster and consume all cluster resources. Smaller, cheaper queries node in a cluster and consume all cluster resources. Smaller, cheaper queries
may be blocked from executing in such cases. We introduced query prioritization may be blocked from executing in such cases. We introduced query prioritization
to address these issues. Each historical node is able to prioritize which to address these issues. Each historical node is able to prioritize which
segments it needs to scan. Proper query planning is critical for production segments it needs to scan. Proper query planning is critical for production
workloads. Thankfully, queries for a significant amount of data tend to be for workloads. Thankfully, queries for a significant amount of data tend to be for
reporting use cases, and users are not expecting the same level of reporting use cases and can be deprioritized. Users do not expect the same level of
interactivity as when they are querying to explore data. interactivity in this use case as when they are exploring data.
\paragraph{Node failures} \paragraph{Node failures}
Single node failures are common in distributed environments, but many nodes Single node failures are common in distributed environments, but many nodes
@ -979,7 +980,7 @@ center to fail. In such cases, new machines need to be provisioned. As long as
deep storage is still available, cluster recovery time is network bound as deep storage is still available, cluster recovery time is network bound as
historical nodes simply need to redownload every segment from deep storage. We historical nodes simply need to redownload every segment from deep storage. We
have experienced such failures in the past, and the recovery time was around have experienced such failures in the past, and the recovery time was around
several hours in the AWS ecosystem on several TBs of data. several hours in the AWS ecosystem for several TBs of data.
\subsection{Operational Monitoring} \subsection{Operational Monitoring}
Proper monitoring is critical to run a large scale distributed cluster. Proper monitoring is critical to run a large scale distributed cluster.