typos, wording, fix overfull hboxes

This commit is contained in:
Xavier Léauté 2014-03-29 23:00:23 -07:00
parent bff902150e
commit 35099e4970
1 changed files with 131 additions and 132 deletions

View File

@ -73,7 +73,7 @@ aggregations, flexible filters, and low latency data ingestion.
% A category with the (minimum) three required fields
\category{H.2.4}{Database Management}{Systems}[Distributed databases]
% \category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures]
\keywords{distributed; real-time; fault-tolerant; analytics; column-oriented; OLAP}
\keywords{distributed; real-time; fault-tolerant; highly available; open source; analytics; column-oriented; OLAP}
\section{Introduction}
@ -96,7 +96,7 @@ large amounts of log data. Hadoop has contributed much to helping companies
convert their low-value event streams into high-value aggregates for a variety
of applications such as business intelligence and A-B testing.
As with a lot of great systems, Hadoop has opened our eyes to a new space of
As with many great systems, Hadoop has opened our eyes to a new space of
problems. Specifically, Hadoop excels at storing and providing access to large
amounts of data, however, it does not make any performance guarantees around
how quickly that data can be accessed. Furthermore, although Hadoop is a
@ -113,7 +113,7 @@ needs. We explored different solutions in the space, and after
trying both Relational Database Management Systems and NoSQL architectures, we
came to the conclusion that there was nothing in the open source world that
could be fully leveraged for our requirements. We ended up creating Druid, an
open-source, distributed, column-oriented, real-time analytical data store. In
open source, distributed, column-oriented, real-time analytical data store. In
many ways, Druid shares similarities with other OLAP systems
\cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
interactive query systems \cite{melnik2010dremel}, main-memory databases
@ -196,7 +196,7 @@ system is unavailable in the face of software upgrades or network failure.
Downtime for startups, who often lack proper internal operations management, can
determine business success or failure.
Finally, another key problem that Metamarkets faced in its early days was to
Finally, another challenge that Metamarkets faced in its early days was to
allow users and alerting systems to be able to make business decisions in
``real-time". The time from when an event is created to when that event is
queryable determines how fast interested parties are able to react to
@ -240,11 +240,11 @@ periodically hand off immutable batches of events they have collected over this
small time range to other nodes in the Druid cluster that are specialized in
dealing with batches of immutable events. Real-time nodes leverage Zookeeper
\cite{hunt2010zookeeper} for coordination with the rest of the Druid cluster.
The nodes announce their online state and the data they are serving in
The nodes announce their online state and the data they serve in
Zookeeper.
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
These indexes are incrementally populated as events are ingested and the
indexes are also directly queryable. Druid behaves as a row store
for queries on events that exist in this JVM heap-based buffer. To avoid heap
overflow problems, real-time nodes persist their in-memory indexes to disk
@ -269,7 +269,7 @@ Queries will hit both the in-memory and persisted indexes.
On a periodic basis, each real-time node will schedule a background task that
searches for all locally persisted indexes. The task merges these indexes
together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this
that have been ingested by a real-time node for some span of time. We refer to this
block of data as a ``segment". During the handoff stage, a real-time node
uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite{decandia2007dynamo} or HDFS
@ -337,7 +337,7 @@ multiple real-time nodes can read events. Multiple real-time nodes can ingest
the same set of events from the bus, creating a replication of events. In a
scenario where a node completely fails and loses disk, replicated streams
ensure that no data is lost. A single ingestion endpoint also allows for data
streams for be partitioned such that multiple real-time nodes each ingest a
streams to be partitioned such that multiple real-time nodes each ingest a
portion of a stream. This allows additional real-time nodes to be seamlessly
added. In practice, this model has allowed one of the largest production Druid
clusters to be able to consume raw data at approximately 500 MB/s (150,000
@ -394,9 +394,9 @@ can also be created with much less powerful backing hardware. The
\subsubsection{Availability}
Historical nodes depend on Zookeeper for segment load and unload instructions.
If Zookeeper becomes unavailable, historical nodes are no longer able to serve
new data and drop outdated data, however, because the queries are served over
HTTP, historical nodes are still be able to respond to query requests for
Should Zookeeper become unavailable, historical nodes are no longer able to serve
new data or drop outdated data, however, because the queries are served over
HTTP, historical nodes are still able to respond to query requests for
the data they are currently serving. This means that Zookeeper outages do not
impact current data availability on historical nodes.
@ -500,7 +500,7 @@ source, recency, and size. The exact details of the algorithm are beyond the
scope of this paper and may be discussed in future literature.
\subsubsection{Replication}
Coordinator nodes may tell different historical nodes to load copies of the
Coordinator nodes may tell different historical nodes to load a copy of the
same segment. The number of replicates in each tier of the historical compute
cluster is fully configurable. Setups that require high levels of fault
tolerance can be configured to have a high number of replicas. Replicated
@ -513,7 +513,7 @@ cluster. Over the last two years, we have never taken downtime in our Druid
cluster for software upgrades.
\subsubsection{Availability}
Druid coordinator nodes have two external dependencies: Zookeeper and MySQL.
Druid coordinator nodes have Zookeeper and MySQL as external dependencies.
Coordinator nodes rely on Zookeeper to determine what historical nodes already
exist in the cluster. If Zookeeper becomes unavailable, the coordinator will no
longer be able to send instructions to assign, balance, and drop segments.
@ -558,7 +558,7 @@ range.
Druid segments are stored in a column orientation. Given that Druid is best
used for aggregating event streams (all data going into Druid must have a
timestamp), the advantages storing aggregate information as columns rather than
timestamp), the advantages of storing aggregate information as columns rather than
rows are well documented \cite{abadi2008column}. Column storage allows for more
efficient CPU usage as only what is needed is actually loaded and scanned. In a
row oriented data store, all columns associated with a row must be scanned as
@ -573,7 +573,7 @@ contain strings. Storing strings directly is unnecessarily costly and string
columns can be dictionary encoded instead. Dictionary encoding is a common
method to compress data and has been used in other data stores such as
PowerDrill \cite{hall2012processing}. In the example in
Table~\ref{tab:sample_data}, we can map each page to an unique integer
Table~\ref{tab:sample_data}, we can map each page to a unique integer
identifier.
{\small\begin{verbatim}
Justin Bieber -> 0
@ -607,7 +607,7 @@ representations.
In many real world OLAP workflows, queries are issued for the aggregated
results of some set of metrics where some set of dimension specifications are
met. An example query is: ``How many Wikipedia edits were done by users in
San Francisco who are also male?". This query is filtering the Wikipedia data
San Francisco who are also male?" This query is filtering the Wikipedia data
set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension
values. In many real world data sets, dimension columns contain strings and
metric columns contain numeric values. Druid creates additional lookup
@ -734,7 +734,7 @@ equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array
} ]
\end{verbatim}}
Druid supports many types of aggregations including double sums, long sums,
Druid supports many types of aggregations including sums on floating-point and integer types,
minimums, maximums, and complex aggregations such as cardinality estimation and
approximate quantile estimation. The results of aggregations can be combined
in mathematical expressions to form other aggregations. It is beyond the scope
@ -756,8 +756,8 @@ The reasons for this decision are generally two-fold.
\end{enumerate}
A join query is essentially the merging of two or more streams of data based on
a shared set of keys. The primary high-level strategies for join queries the
authors are aware of are a hash-based strategy or a sorted-merge strategy. The
a shared set of keys. The primary high-level strategies for join queries we
are aware of are a hash-based strategy or a sorted-merge strategy. The
hash-based strategy requires that all but one data set be available as
something that looks like a hash table, a lookup operation is then performed on
this hash table for every row in the ``primary" stream. The sorted-merge
@ -770,16 +770,15 @@ When all sides of the join are significantly large tables (> 1 billion records),
materializing the pre-join streams requires complex distributed memory
management. The complexity of the memory management is only amplified by
the fact that we are targeting highly concurrent, multitenant workloads.
This is, as far as the authors are aware, an active academic research
problem that we would be more than willing to engage with the academic
community to help resolving in a scalable manner.
This is, as far as we are aware, an active academic research
problem that we would be willing to help resolve in a scalable manner.
\section{Performance}
\label{sec:benchmarks}
Druid runs in production at several organizations, and to demonstrate its
performance, we have chosen to share some real world numbers for the main production
cluster running at Metamarkets in early 2014. For comparison with other databases
cluster running at Metamarkets as of early 2014. For comparison with other databases
we also include results from synthetic workloads on TPC-H data.
\subsection{Query Performance in Production}
@ -789,7 +788,7 @@ based on a given metric is much more expensive than a simple count over a time
range. To showcase the average query latencies in a production Druid cluster,
we selected 8 of our most queried data sources, described in Table~\ref{tab:datasources}.
Approximately 30\% of the queries are standard
Approximately 30\% of queries are standard
aggregates involving different types of metrics and filters, 60\% of queries
are ordered group bys over one or more dimensions with aggregates, and 10\% of
queries are search queries and metadata retrieval queries. The number of
@ -827,7 +826,7 @@ approximately 10TB of segments loaded. Collectively,
there are about 50 billion Druid rows in this tier. Results for
every data source are not shown.
\item The hot tier uses Intel Xeon E5-2670 processors and consists of 1302 processing
\item The hot tier uses Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 processors and consists of 1302 processing
threads and 672 total cores (hyperthreaded).
\item A memory-mapped storage engine was used (the machine was configured to
@ -871,8 +870,8 @@ open source column store because we were not confident we could correctly tune
it for optimal performance.
Our Druid setup used Amazon EC2
\texttt{m3.2xlarge} (Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
historical nodes and \texttt{c3.2xlarge} (Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz) instances for broker
\texttt{m3.2xlarge} instance types (Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2680 v2 @ 2.80GHz) for
historical nodes and \texttt{c3.2xlarge} instances (Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 v2 @ 2.50GHz) for broker
nodes. Our MySQL setup was an Amazon RDS instance that ran on the same \texttt{m3.2xlarge} instance type.
The results for the 1 GB TPC-H data set are shown
@ -918,7 +917,7 @@ well.
To showcase Druid's data ingestion latency, we selected several production
datasources of varying dimensions, metrics, and event volumes. Our production
ingestion setup consists of 6 nodes, totalling 360GB of RAM and 96 cores
(12 x Intel Xeon E5-2670).
(12 x Intel\textsuperscript\textregistered Xeon\textsuperscript\textregistered E5-2670).
Note that in this setup, several other data sources were being ingested and
many other Druid related ingestion tasks were running concurrently on the machines.
@ -974,9 +973,9 @@ running an Amazon \texttt{cc2.8xlarge} instance.
The latency measurements we presented are sufficient to address the stated
problems of interactivity. We would prefer the variability in the latencies to
be less. It is still very possible to decrease latencies by adding
be less. It is still possible to decrease latencies by adding
additional hardware, but we have not chosen to do so because infrastructure
costs are still a consideration to us.
costs are still a consideration for us.
\section{Druid in Production}\label{sec:production}
Over the last few years, we have gained tremendous knowledge about handling
@ -988,8 +987,8 @@ explore use case, the number of queries issued by a single user are much higher
than in the reporting use case. Exploratory queries often involve progressively
adding filters for the same time range to narrow down results. Users tend to
explore short time intervals of recent data. In the generate report use case,
users query for much longer data intervals, but users also already know the
queries they want to issue.
users query for much longer data intervals, but those queries are generally few
and pre-determined.
\paragraph{Multitenancy}
Expensive concurrent queries can be problematic in a multitenant
@ -1005,7 +1004,7 @@ interactivity in this use case as when they are exploring data.
\paragraph{Node failures}
Single node failures are common in distributed environments, but many nodes
failing at once are not. If historical nodes completely fail and do not
recover, their segments need to reassigned, which means we need excess cluster
recover, their segments need to be reassigned, which means we need excess cluster
capacity to load this data. The amount of additional capacity to have at any
time contributes to the cost of running a cluster. From our experiences, it is
extremely rare to see more than 2 nodes completely fail at once and hence, we
@ -1016,10 +1015,10 @@ historical nodes.
Complete cluster failures are possible, but extremely rare. If Druid is
only deployed in a single data center, it is possible for the entire data
center to fail. In such cases, new machines need to be provisioned. As long as
deep storage is still available, cluster recovery time is network bound as
deep storage is still available, cluster recovery time is network bound, as
historical nodes simply need to redownload every segment from deep storage. We
have experienced such failures in the past, and the recovery time was around
several hours in the AWS ecosystem for several TBs of data.
have experienced such failures in the past, and the recovery time was
several hours in the Amazon AWS ecosystem for several terabytes of data.
\subsection{Operational Monitoring}
Proper monitoring is critical to run a large scale distributed cluster.
@ -1035,16 +1034,16 @@ performance and stability of the production cluster. This dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production and what users are most interested in.
made in production and what aspects of the data users are most interested in.
\subsection{Pairing Druid with a Stream Processor}
At the time of writing, Druid can only understand fully denormalized data
Currently, Druid can only understand fully denormalized data
streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite{marz2013storm}.
A Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations
simple transformations, such as id to name lookups, to complex operations
such as multi-stream joins. The Storm topology forwards the processed event
stream to Druid in real-time. Storm handles the streaming data processing work,
and Druid is used for responding to queries for both real-time and
@ -1075,14 +1074,14 @@ Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
stores designed for some of the same data warehousing issues that Druid
is meant to solve. These systems include in-memory databases such as
SAPs HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
native analytical features baked in, similar to ParAccel \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
Druid is similiar to C-Store \cite{stonebraker2005c} and LazyBase \cite{cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
@ -1090,7 +1089,7 @@ Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
Trident/Storm \cite{marz2013storm} and Spark Streaming
\cite{zaharia2012discretized}, however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
@ -1111,7 +1110,7 @@ stores \cite{macnicol2004sybase}.
\section{Conclusions}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time
In this paper we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
and is optimized for low query latencies. Druid supports streaming data
ingestion and is fault-tolerant. We discussed Druid benchmarks and