fix paper some

This commit is contained in:
fjy 2013-12-06 15:59:24 -08:00
parent 7c7cfdf819
commit b9314f40ed
7 changed files with 75 additions and 68 deletions

Binary file not shown.

View File

@ -141,25 +141,26 @@ to provide a low latency data ingestion and query platform for interactive
applications \cite{tschetter2011druid}. In the early days of Metamarkets, the
company was focused on building a web-based dashboard that would allow users to
arbitrary explore and visualize event streams. Interactivity was very important
to us; we didn't want our users sitting around waiting for their data
visualizations to update.
to us and long delays in updating data visualizations makes for a poor user experience.
In addition to the query latency needs, the system had to be multi-tenant and
highly available. Downtime is costly and many businesses cannot afford to wait
if a system is unavailable in the face of software upgrades or network failure.
Downtime for startups, many of whom have no internal operations teams, can
mean the difference between business success and failure.
Downtime for startups, who often do not have internal operations teams, can
determine business success and failure.
Finally, another key problem that Metamarkets faced in the early stages of the
company was to allow users and alerting systems to be able to make business
decisions in real-time. The time from when an event was created to when that
event could be queried determined how fast users and systems were able to react
to potentially catastrophic occurences in their systems.
decisions in real-time. The time from when an event is created to when that
event is queryable determines how fast users and systems are able to react to
potentially catastrophic occurences in their systems. Popular open source data
warehousing systems such as Hadoop were unable the sub-second data ingestion
latencies we required.
The problems of data exploration, ingestion, and availability span multiple
industries. Since Druid was open sourced in October 2012, it been deployed as a
video, network monitoring, operation monitoring, and advertising analytics
platform.
video, network monitoring, operations monitoring, and online advertising
analytics platform.
\section{Architecture}
\label{sec:architecture}
@ -173,7 +174,7 @@ operate fairly independent of each other. To solve complex data analysis
problems, the node types come together to form a fully working system. The
name Druid comes from the Druid class in many role-playing games: it is a
shape-shifter, capable of taking many different forms to fulfill various
different roles in a group. The composition and flow of data of a Druid
different roles in a group. The composition of and flow of data in a Druid
cluster are shown in Figure~\ref{fig:cluster}.
\begin{figure*}
@ -185,12 +186,15 @@ cluster are shown in Figure~\ref{fig:cluster}.
\subsection{Real-time Nodes}
\label{sec:realtime}
Real-time nodes encapsulate the functionality to ingest and query real-time
event streams. Events indexed via these nodes are immediately available for
querying. The nodes are only concerned with events for some small time range
and periodically hand off immutable batches of events they've collected over
this small time range to other nodes in the Druid cluster that are specialized
in dealing with batches of immutable events.
Real-time nodes encapsulate the functionality to ingest and query event
streams. Events indexed via these nodes are immediately available for querying.
The nodes are only concerned with events for some small time range and
periodically hand off immutable batches of events they've collected over this
small time range to other nodes in the Druid cluster that are specialized in
dealing with batches of immutable events. Real-time nodes leverage Zookeeper
\cite{hunt2010zookeeper} for coordination with the rest of the Druid cluster.
The nodes announce their online state and the data they are serving in
Zookeeper.
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
@ -201,20 +205,7 @@ periodically or after some maximum row limit is reached. This persist process
converts data stored in the in-memory buffer to a column oriented storage
format described in \ref{sec:storage-format}. Each persisted index is immutable and
real-time nodes load persisted indexes into off-heap memory such that they can
still be queried.
Real-time nodes maintain a consolidated view of their in-memory index and of
all indexes persisted to disk. This unified view allows all indexes on a node
to be queried. On a periodic basis, each node will schedule a background task
that searches for all locally persisted indexes. The task merges these indexes
together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this
block of data as a "segment". During the hand-off stage, a real-time node
uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite{decandia2007dynamo} or HDFS
\cite{shvachko2010hadoop}, which Druid refers to as "deep storage". The ingest,
persist, merge, and handoff steps are fluid; there is no data loss during this
process. Figure~\ref{fig:realtime_flow} illustrates the process.
still be queried. Figure~\ref{fig:realtime_flow} illustrates the process.
\begin{figure}
\centering
@ -226,9 +217,18 @@ both the in-memory index and the persisted indexes.}
\label{fig:realtime_flow}
\end{figure}
Real-time nodes leverage Zookeeper \cite{hunt2010zookeeper} for coordination
with the rest of the Druid cluster. The nodes announce their online state and
the data they are serving in Zookeeper. To better understand the flow of data
On a periodic basis, each real-time node will schedule a background task
that searches for all locally persisted indexes. The task merges these indexes
together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this
block of data as a "segment". During the hand-off stage, a real-time node
uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite{decandia2007dynamo} or HDFS
\cite{shvachko2010hadoop}, which Druid refers to as "deep storage". The ingest,
persist, merge, and handoff steps are fluid; there is no data loss during this
process.
To better understand the flow of data
through a real-time node, consider the following example. First, we start a
real-time node at 13:37. The node will announce that it is serving a segment of
data for a period of time from 13:00 to 14:00 and will only accept events with
@ -236,7 +236,7 @@ timestamps in this time range. Every 10 minutes (the persist period is
configurable), the node will flush and persist its in-memory buffer to disk.
Near the end of the hour, the node will likely see events with timestamps from
14:00 to 15:00. When this occurs, the real-time node prepares to serve data for
the next hour by creating a new in-memory index and announces that it is also
the next hour and creates a new in-memory index. The node then announces that it is also
serving a segment for data from 14:00 to 15:00. The node does not immediately
merge the indexes it persisted from 13:00 to 14:00, instead it waits for a
configurable window period for straggling events from 13:00 to 14:00 to come
@ -251,8 +251,7 @@ process is shown in Figure~\ref{fig:realtime_timeline}.
\begin{figure*}
\centering
\includegraphics[width = 4.5in]{realtime_timeline}
\caption{A timelime that represents the typical operations a real-time node
undergoes. The node starts, ingests data, persists, and periodically hands data
\caption{The node starts, ingests data, persists, and periodically hands data
off. This process repeats indefinitely. The time intervals between different
real-time node operations are configurable.}
\label{fig:realtime_timeline}
@ -299,12 +298,13 @@ approximately 500 MB/s (150,000 events/s or 2 TB/hour).
\subsection{Historical Nodes}
Historical nodes encapsulate the functionality to load and serve the immutable
blocks of data (segments) created by real-time nodes. In many real-world workflows, most
of the data loaded in a Druid cluster is immutable and hence, historical nodes
are typically the main workers of a Druid cluster. Historical nodes follow a
shared-nothing architecture and there is no single point of contention among
the nodes. The nodes have no knowledge of one another and are operationally
simple; they only know how to load, drop, and serve immutable segments.
blocks of data (segments) created by real-time nodes. In many real-world
workflows, most of the data loaded in a Druid cluster is immutable and hence,
historical nodes are typically the main workers of a Druid cluster. Historical
nodes follow a shared-nothing architecture and there is no single point of
contention among the nodes. The nodes have no knowledge of one another and are
operationally simple; they only know how to load, drop, and serve immutable
segments.
Similar to real-time nodes, historical nodes announce their online state and
the data they are serving in Zookeeper. Instructions to load and drop segments
@ -315,7 +315,7 @@ first checks a local cache that maintains information about what segments
already exist on the node. If information about a segment is not present, the
historical node will proceed to download the segment from deep storage. This
process is shown in Figure~\ref{fig:historical_download}. Once processing is
complete, the availability of the segment is announced. At this point, the
complete, the segment is announced in Zookeeper. At this point, the
segment is queryable. The local cache also allows for historical nodes to be
quickly updated and restarted. On startup, the node examines its cache and
immediately serves whatever data it finds.
@ -329,7 +329,7 @@ immediately serves whatever data it finds.
Historical nodes can support read consistency because they only deal with
immutable data. Immutable data blocks also enable a simple parallelization
model: historical nodes can scan and aggregate immutable blocks concurrently
model: historical nodes scan and aggregate immutable blocks concurrently
without blocking.
\subsubsection{Tiers}
@ -820,23 +820,23 @@ with the following configurations:
\item CPU: 2.3 GHz Intel Core i7
\end{itemize}
Druid's data ingestion latency is heavily dependent on the complexity of the data set
being ingested. The data complexity is determined by the number of dimensions
in each event, the number of metrics in each event, and the types of
aggregations we want to perform as we roll up data to a certain time
granularity. With the most basic data set (one that only has a timestamp
column), our setup can ingest data at a rate of 800k events/sec/node, which is
really just a measurement of how fast we can deserialize events. Real world
data sets are never this simple. To simulate more real-world ingestion rates,
we created a data set with 5 dimensions and a single metric. 4 out of the 5
dimensions have a cardinality less than 100, and we varied the cardinality of
the final dimension. The results of varying the cardinality of a dimension is
shown in Figure~\ref{fig:cardinality_vs_throughput}.
Druid's data ingestion latency is heavily dependent on the complexity of the
data set being ingested. The data complexity is determined by the number of
dimensions in each event, the number of metrics in each event, and the types of
aggregations we want to perform on those metrics. With the most basic data set
(one that only has a timestamp column), our setup can ingest data at a rate of
800k events/sec/node, which is really just a measurement of how fast we can
deserialize events. Real world data sets are never this simple. To simulate
real-world ingestion rates, we created a data set with 5 dimensions and a
single metric. 4 out of the 5 dimensions have a cardinality less than 100, and
we varied the cardinality of the final dimension. The results of varying the
cardinality of a dimension is shown in
Figure~\ref{fig:throughput_vs_cardinality}.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{cardinality_vs_throughput}
\caption{Varying the cardinality of a single dimension, we can see the impact on throughput.}
\label{fig:cardinality_vs_throughput}
\includegraphics[width = 2.8in]{throughput_vs_cardinality}
\caption{When we Vary the cardinality of a single dimension, we can see monotonically decreasing throughput.}
\label{fig:throughput_vs_cardinality}
\end{figure}
In Figure~\ref{fig:throughput_vs_num_dims}, we instead vary the number of
@ -846,21 +846,28 @@ increases.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{throughput_vs_num_dims}
\caption{Increasing the number of dimensions of our data set, we see a similar decline in throughput.}
\caption{Increasing the number of dimensions of our data set also leads to a decline in throughput.}
\label{fig:throughput_vs_num_dims}
\end{figure}
Finally, keeping our number of dimensions constant at 5, with each dimension
having a cardinality in the 0-100 range, we can see a similar decline in
throughput when we increase the number of metrics in the data set. For most
real world data sets, the number of metrics tends to be less than the number of
dimensions. Hence, we can see that adding a few new metrics does not
Finally, keeping our number of dimensions constant at 5, with four dimensions
having a cardinality in the 0-100 range and the final dimension having a
cardinality of 10,000, we can see a similar decline in throughput when we
increase the number of metrics/aggregators in the data set. We used random
types of metrics/aggregators in this experiment, and they vary from longs,
doubles, and other more complex types. The randomization introduces more noise
in the results, leading to a graph that is not strictly decreasing. These
results are shown in Figure~\ref{fig:throughput_vs_num_metrics}. For most real
world data sets, the number of metrics tends to be less than the number of
dimensions. Hence, we can see that introducing a few new metrics does not
substantially impact the ingestion latency.
\begin{figure}
\centering
\includegraphics[width = 2.8in]{ingestion_latency_vs_num_metrics}
\caption{Adding new metrics to a data set decreases ingestion latency, however, in most real world data sets, the number of metrics in a data set tends to be low and the impact of adding them is overly substantial.}
\label{fig:ingestion_latency_vs_num_metrics}
\includegraphics[width = 2.8in]{throughput_vs_num_metrics}
\caption{Adding new metrics to a data set decreases ingestion latency. In most
real world data sets, the number of metrics in a data set tends to be lower
than the number of dimensions.}
\label{fig:throughput_vs_num_metrics}
\end{figure}
\section{Related Work}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 77 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 33 KiB

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB