mirror of https://github.com/apache/druid.git
fix broker datasources path
This commit is contained in:
parent
9e54e11827
commit
12e0dbce68
|
@ -215,7 +215,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
||||||
);
|
);
|
||||||
|
|
||||||
root.addEventListener(new GuiceServletConfig(injector));
|
root.addEventListener(new GuiceServletConfig(injector));
|
||||||
root.addFilter(GuiceFilter.class, "/datasources/*", 0);
|
root.addFilter(GuiceFilter.class, "/*", 0);
|
||||||
|
|
||||||
for (String path : pathsForGuiceFilter) {
|
for (String path : pathsForGuiceFilter) {
|
||||||
root.addFilter(GuiceFilter.class, path, 0);
|
root.addFilter(GuiceFilter.class, path, 0);
|
||||||
|
|
|
@ -42,7 +42,7 @@ import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
@Path("/datasources")
|
@Path("/druid/v2/datasources")
|
||||||
public class ClientInfoResource
|
public class ClientInfoResource
|
||||||
{
|
{
|
||||||
private static final int SEGMENT_HISTORY_MILLIS = 7 * 24 * 60 * 60 * 1000; // ONE WEEK
|
private static final int SEGMENT_HISTORY_MILLIS = 7 * 24 * 60 * 60 * 1000; // ONE WEEK
|
||||||
|
|
Binary file not shown.
|
@ -0,0 +1,955 @@
|
||||||
|
% THIS IS AN EXAMPLE DOCUMENT FOR VLDB 2012
|
||||||
|
% based on ACM SIGPROC-SP.TEX VERSION 2.7
|
||||||
|
% Modified by Gerald Weber <gerald@cs.auckland.ac.nz>
|
||||||
|
% Removed the requirement to include *bbl file in here. (AhmetSacan, Sep2012)
|
||||||
|
% Fixed the equation on page 3 to prevent line overflow. (AhmetSacan, Sep2012)
|
||||||
|
|
||||||
|
\documentclass{vldb}
|
||||||
|
\usepackage{graphicx}
|
||||||
|
\usepackage{balance} % for \balance command ON LAST PAGE (only there!)
|
||||||
|
\usepackage{fontspec}
|
||||||
|
\setmainfont[Ligatures={TeX}]{Times}
|
||||||
|
\usepackage{hyperref}
|
||||||
|
\graphicspath{{figures/}}
|
||||||
|
|
||||||
|
\hyphenation{metamarkets nelson}
|
||||||
|
|
||||||
|
\begin{document}
|
||||||
|
|
||||||
|
% ****************** TITLE ****************************************
|
||||||
|
|
||||||
|
\title{Druid: Real-time Analytical Data Store}
|
||||||
|
|
||||||
|
% possible, but not really needed or used for PVLDB:
|
||||||
|
%\subtitle{[Extended Abstract]
|
||||||
|
%\titlenote{A full version of this paper is available as\textit{Author's Guide to Preparing ACM SIG Proceedings Using \LaTeX$2_\epsilon$\ and BibTeX} at \texttt{www.acm.org/eaddress.htm}}}
|
||||||
|
|
||||||
|
% ****************** AUTHORS **************************************
|
||||||
|
|
||||||
|
% You need the command \numberofauthors to handle the 'placement
|
||||||
|
% and alignment' of the authors beneath the title.
|
||||||
|
%
|
||||||
|
% For aesthetic reasons, we recommend 'three authors at a time'
|
||||||
|
% i.e. three 'name/affiliation blocks' be placed beneath the title.
|
||||||
|
%
|
||||||
|
% NOTE: You are NOT restricted in how many 'rows' of
|
||||||
|
% "name/affiliations" may appear. We just ask that you restrict
|
||||||
|
% the number of 'columns' to three.
|
||||||
|
%
|
||||||
|
% Because of the available 'opening page real-estate'
|
||||||
|
% we ask you to refrain from putting more than six authors
|
||||||
|
% (two rows with three columns) beneath the article title.
|
||||||
|
% More than six makes the first-page appear very cluttered indeed.
|
||||||
|
%
|
||||||
|
% Use the \alignauthor commands to handle the names
|
||||||
|
% and affiliations for an 'aesthetic maximum' of six authors.
|
||||||
|
% Add names, affiliations, addresses for
|
||||||
|
% the seventh etc. author(s) as the argument for the
|
||||||
|
% \additionalauthors command.
|
||||||
|
% These 'additional authors' will be output/set for you
|
||||||
|
% without further effort on your part as the last section in
|
||||||
|
% the body of your article BEFORE References or any Appendices.
|
||||||
|
|
||||||
|
\numberofauthors{7} % in this sample file, there are a *total*
|
||||||
|
% of EIGHT authors. SIX appear on the 'first-page' (for formatting
|
||||||
|
% reasons) and the remaining two appear in the \additionalauthors section.
|
||||||
|
|
||||||
|
\author{
|
||||||
|
% You can go ahead and credit any number of authors here,
|
||||||
|
% e.g. one 'row of three' or two rows (consisting of one row of three
|
||||||
|
% and a second row of one, two or three).
|
||||||
|
%
|
||||||
|
% The command \alignauthor (no curly braces needed) should
|
||||||
|
% precede each author name, affiliation/snail-mail address and
|
||||||
|
% e-mail address. Additionally, tag each line of
|
||||||
|
% affiliation/address with \affaddr, and tag the
|
||||||
|
% e-mail address with \email.
|
||||||
|
%
|
||||||
|
% 1st. author
|
||||||
|
\alignauthor Fangjin Yang\\
|
||||||
|
\affaddr{Metamarkets, Inc.}\\
|
||||||
|
\affaddr{625 2nd Street, Suite 230}\\
|
||||||
|
\affaddr{San Francisco, CA, USA}\\
|
||||||
|
\email{fangjin@metamarkets.com}
|
||||||
|
\alignauthor Eric Tschetter\\
|
||||||
|
\affaddr{Metamarkets, Inc.}\\
|
||||||
|
\affaddr{625 2nd Street, Suite 230}\\
|
||||||
|
\affaddr{San Francisco, CA, USA}\\
|
||||||
|
\email{eric@metamarkets.com}
|
||||||
|
\alignauthor Gian Merlino\\
|
||||||
|
\affaddr{Metamarkets, Inc.}\\
|
||||||
|
\affaddr{625 2nd Street, Suite 230}\\
|
||||||
|
\affaddr{San Francisco, CA, USA}\\
|
||||||
|
\email{gian@metamarkets.com}
|
||||||
|
\and
|
||||||
|
\alignauthor Nelson Ray\\
|
||||||
|
\affaddr{Metamarkets, Inc.}\\
|
||||||
|
\affaddr{625 2nd Street, Suite 230}\\
|
||||||
|
\affaddr{San Francisco, CA, USA}\\
|
||||||
|
\email{nelson@metamarkets.com}
|
||||||
|
\alignauthor Xavier Léauté\\
|
||||||
|
\affaddr{Metamarkets, Inc.}\\
|
||||||
|
\affaddr{625 2nd Street, Suite 230}\\
|
||||||
|
\affaddr{San Francisco, CA, USA}\\
|
||||||
|
\email{xavier@metamarkets.com}
|
||||||
|
\alignauthor Deep Ganguli\\
|
||||||
|
\affaddr{Metamarkets, Inc.}\\
|
||||||
|
\affaddr{625 2nd Street, Suite 230}\\
|
||||||
|
\affaddr{San Francisco, CA, USA}\\
|
||||||
|
\email{deep@metamarkets.com}
|
||||||
|
}
|
||||||
|
% There's nothing stopping you putting the seventh, eighth, etc.
|
||||||
|
% author on the opening page (as the 'third row') but we ask,
|
||||||
|
% for aesthetic reasons that you place these 'additional authors'
|
||||||
|
% in the \additional authors block, viz.
|
||||||
|
\additionalauthors{Michael Driscoll (Metamarkets, \texttt{mike@metamarkets.com})}
|
||||||
|
\date{21 March 2013}
|
||||||
|
% Just remember to make sure that the TOTAL number of authors
|
||||||
|
% is the number that will appear on the first page PLUS the
|
||||||
|
% number that will appear in the \additionalauthors section.
|
||||||
|
|
||||||
|
\maketitle
|
||||||
|
|
||||||
|
\begin{abstract}
|
||||||
|
Druid is a scalable, real-time analytical data store that supports
|
||||||
|
ad-hoc queries on large-scale data sets. The system combines a
|
||||||
|
columnar data layout, a shared-nothing architecture, and an advanced
|
||||||
|
indexing structure to allow arbitrary exploration of billion-row
|
||||||
|
tables with sub-second latencies. Druid scales horizontally and is the
|
||||||
|
core engine of the Metamarkets platform. In this paper, we detail the
|
||||||
|
architecture and implementation of Druid and describe how it solves
|
||||||
|
the real-time data ingestion problem.
|
||||||
|
\end{abstract}
|
||||||
|
|
||||||
|
\section{Introduction}
|
||||||
|
In recent years, enterprises are facing ever-growing collections of
|
||||||
|
data in all forms. The scale of data has reached terabytes, even
|
||||||
|
petabytes of information per day. Companies are increasingly realizing
|
||||||
|
the importance of unlocking the insights contained within this
|
||||||
|
data. Numerous database vendors such as IBM’s Netezza \cite{singh2011introduction}, Vertica
|
||||||
|
\cite{bear2012vertica}, and EMC’s Greenplum \cite{miner2012unified} offer data warehousing solutions, and
|
||||||
|
several research papers \cite{barroso2009datacenter,
|
||||||
|
chaudhuri1997overview, dewitt1992parallel} directly address this problem as
|
||||||
|
well. As the interactive data exploration space becomes more mature,
|
||||||
|
it is apparent that real-time ingestion and exploration of data will
|
||||||
|
unlock business-critical decisions for front office and back office
|
||||||
|
analysts.
|
||||||
|
|
||||||
|
Metamarkets realized early on that for high data volumes, existing
|
||||||
|
Relational Database Management Systems (RDBMS) and most NoSQL
|
||||||
|
architectures were not sufficient to address the performance and
|
||||||
|
use-case needs of the business intelligence space. Druid was built to
|
||||||
|
address a gap we believe exists in the current big-data ecosystem for
|
||||||
|
a real-time analytical data store. Druid is a distributed, columnar,
|
||||||
|
shared-nothing data store designed to reliably scale to petabytes of
|
||||||
|
data and thousands of cores. It is a highly available system designed
|
||||||
|
to run on commodity hardware with no downtime in the face of failures,
|
||||||
|
data imports or software updates. We have been building Druid over the
|
||||||
|
course of the last two years and it is the core engine of the
|
||||||
|
Metamarkets technology stack.
|
||||||
|
|
||||||
|
In many ways, Druid shares similarities with other interactive query systems
|
||||||
|
\cite{melnik2010dremel}, main-memory databases \cite{farber2012sap}, and widely-known distributed data
|
||||||
|
stores such as BigTable \cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra \cite{lakshman2010cassandra}. Unlike
|
||||||
|
most traditional data stores, Druid operates mainly on read-only data
|
||||||
|
and has limited functionality for writes. The system is highly optimized
|
||||||
|
for large-scale transactional data aggregation and arbitrarily deep data exploration. Druid is highly configurable
|
||||||
|
and allows users to adjust levels of fault tolerance and
|
||||||
|
performance.
|
||||||
|
|
||||||
|
Druid builds on the ideas of other distributed data stores, real-time
|
||||||
|
computation engines, and search engine indexing algorithms. In this
|
||||||
|
paper, we make the following contributions to academia:
|
||||||
|
\begin{itemize}
|
||||||
|
\item We outline Druid’s real-time ingestion and query capabilities
|
||||||
|
and explain how we can explore events within milliseconds of their
|
||||||
|
creation
|
||||||
|
\item We describe how the architecture allows for fast and flexible
|
||||||
|
queries and how inverted indices can be applied to quickly filter
|
||||||
|
data
|
||||||
|
\item We present experiments benchmarking Druid’s performance
|
||||||
|
\end{itemize}
|
||||||
|
The format of the paper is as follows: Section \ref{sec:data-model} describes the Druid
|
||||||
|
data model. Section \ref{sec:cluster} presents an overview of the components of a
|
||||||
|
Druid cluster. Section \ref{sec:query-api} outlines the query API. Section \ref{sec:storage} describes
|
||||||
|
data storage format in greater detail. Section \ref{sec:robustness} discusses Druid
|
||||||
|
robustness and failure responsiveness. Section \ref{sec:benchmarks} provides our
|
||||||
|
performance benchmarks. Section \ref{sec:related} lists the related work and Section \ref{sec:conclusions} presents
|
||||||
|
our conclusions.
|
||||||
|
|
||||||
|
\section{Data Model}
|
||||||
|
\label{sec:data-model}
|
||||||
|
The fundamental storage unit in Druid is the segment. Distribution and
|
||||||
|
replication in Druid are always done at a segment level. Segments
|
||||||
|
encapsulate a partition of a larger transactional data set. To better
|
||||||
|
understand how a typical row/column collection of data translates into
|
||||||
|
a Druid segment, consider the data set shown in Table~\ref{tab:sample_data}.
|
||||||
|
|
||||||
|
\begin{table*}
|
||||||
|
\centering
|
||||||
|
\caption{Sample Druid data}
|
||||||
|
\label{tab:sample_data}
|
||||||
|
\texttt{
|
||||||
|
\begin{tabular}{ l l l l l l l l l }
|
||||||
|
\hline
|
||||||
|
\textbf{Timestamp} & \textbf{Publisher} & \textbf{Advertiser} & \textbf{Gender} & \textbf{Country} & \textbf{Impressions} & \textbf{Clicks} & \textbf{Revenue} \\
|
||||||
|
2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 1800 & 25 & 15.70 \\
|
||||||
|
2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 2912 & 42 & 29.18 \\
|
||||||
|
2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 1953 & 17 & 17.31 \\
|
||||||
|
2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 3194 & 170 & 34.01 \\
|
||||||
|
\end{tabular}
|
||||||
|
}
|
||||||
|
\end{table*}
|
||||||
|
|
||||||
|
A segment is composed of multiple binary files, each representing a
|
||||||
|
column of a data set. The data set in Table~\ref{tab:sample_data} consists of 8 distinct
|
||||||
|
columns, one of which is the timestamp column. Druid always requires a
|
||||||
|
timestamp column because it (currently) only operates with event-based
|
||||||
|
data. Segments always represent some time interval and each column
|
||||||
|
file contains the specific values for that column over the time
|
||||||
|
interval. Since segments always contain data for a time range, it is
|
||||||
|
logical that Druid partitions data into smaller chunks based on the
|
||||||
|
timestamp value. In other words, segments can be thought of as blocks
|
||||||
|
of data that represent a certain granularity of time. For example, if
|
||||||
|
we wanted to shard the data in Table~\ref{tab:sample_data} to an hourly granularity, the
|
||||||
|
partitioning algorithm would result in two segments, one representing
|
||||||
|
each hour of 2011-01-01. Similarly, if we sharded the data to a daily
|
||||||
|
granularity, we would create a single segment for 2011-01-01.
|
||||||
|
|
||||||
|
Partitioning the data based on granularity buckets allows users to
|
||||||
|
fine tune the degree of parallelization they want in Druid. A data set
|
||||||
|
representing a year’s worth of data may be bucketed by day, and a data
|
||||||
|
set representing only a day’s worth of day may be partitioned by
|
||||||
|
hour. Sharding on a single dimension (time) may still result in segments that are too large to manage if the data volume is sufficiently high.
|
||||||
|
To create more operable partition chunks, Druid may
|
||||||
|
additionally shard data based on other factors such as dimension
|
||||||
|
cardinality. Each shard creates a segment and hence, segments are uniquely identified by a data source
|
||||||
|
id describing the data, the time interval of the data, a version
|
||||||
|
indicating when the segment was created, and a shard partition number.
|
||||||
|
|
||||||
|
Data in Druid can be conceptually thought of as being either real-time
|
||||||
|
or historical. Real-time data refers to recently ingested data;
|
||||||
|
typically, in a production setting, this will be data for the current
|
||||||
|
hour. Historical data refers to any data that is older. Segments can
|
||||||
|
be similarly classified as being either real-time or historical.
|
||||||
|
|
||||||
|
Historical segments are immutable and do not support insert, delete,
|
||||||
|
or update semantics. By maintaining immutable blocks of data within
|
||||||
|
the system, we can maintain a consistent snapshot of historical
|
||||||
|
data and provide read consistency without having to worry about
|
||||||
|
concurrent updates and deletes. If updates to a historical segment are
|
||||||
|
required, we build a new segment for the same data source and time
|
||||||
|
interval with the updated data. This new segment will have an updated version identifier.
|
||||||
|
|
||||||
|
Multiple segments for the same data source and time range
|
||||||
|
may exist in the system at any time. To provide read consistency,
|
||||||
|
Druid read operations always access data in a particular time range
|
||||||
|
from the segment with the latest version identifier for that time
|
||||||
|
range. Historical segments may be stored on local disk or in a
|
||||||
|
key-value “deep” store such as S3 \cite{decandia2007dynamo} or HDFS \cite{shvachko2010hadoop}. All historical
|
||||||
|
segments have associated metadata describing properties of the segment
|
||||||
|
such as size in bytes, compression format, and location in deep
|
||||||
|
storage.
|
||||||
|
|
||||||
|
Real-time segments are mutable and generally represent a much shorter
|
||||||
|
duration of time than historical segments. Real-time segments contain
|
||||||
|
recent data and are incrementally populated as new events are
|
||||||
|
ingested. On a periodic basis, real-time segments are converted into
|
||||||
|
historical segments. Additional details about this conversion process
|
||||||
|
are given in Section~\ref{sec:realtime}.
|
||||||
|
|
||||||
|
\section{Cluster}
|
||||||
|
\label{sec:cluster}
|
||||||
|
A Druid cluster consists of different types of nodes, each performing
|
||||||
|
a specific function. The composition of a Druid cluster is shown in
|
||||||
|
Figure~\ref{fig:druid_cluster}.
|
||||||
|
|
||||||
|
\begin{figure*}
|
||||||
|
\centering
|
||||||
|
\includegraphics[width = 6in]{druid_cluster}
|
||||||
|
\caption{An overview of a Druid cluster.}
|
||||||
|
\label{fig:druid_cluster}
|
||||||
|
\end{figure*}
|
||||||
|
|
||||||
|
Recall that data in Druid is classified as either real-time or
|
||||||
|
historical. The Druid cluster is architected to reflect this
|
||||||
|
conceptual separation of data. Real-time nodes are responsible for
|
||||||
|
ingesting, storing, and responding to queries for the most recent
|
||||||
|
events. Similarly, historical compute nodes are responsible for
|
||||||
|
loading and responding to queries for historical events.
|
||||||
|
|
||||||
|
Data in Druid is stored on storage nodes. Storage nodes can be either
|
||||||
|
compute or real-time nodes. Queries to access this data will
|
||||||
|
typically first hit a layer of broker nodes. Broker nodes are
|
||||||
|
responsible for finding and routing queries down to the storage nodes
|
||||||
|
that host the pertinent data. The storage nodes compute their portion
|
||||||
|
of the query response in parallel and return their results to the
|
||||||
|
brokers. Broker nodes, compute nodes, and realtime nodes can all be
|
||||||
|
classified as queryable nodes.
|
||||||
|
|
||||||
|
Druid also has a set of coordination nodes to manage load assignment,
|
||||||
|
distribution, and replication. Coordination nodes are not queryable
|
||||||
|
and instead focus on maintaining cluster stability. Coordination nodes
|
||||||
|
have an external dependency on a MySQL database.
|
||||||
|
|
||||||
|
Druid relies on an Apache Zookeeper \cite{hunt2010zookeeper} cluster
|
||||||
|
for coordination. This dependency is required because there is no
|
||||||
|
direct coordination-related communication between Druid nodes. The
|
||||||
|
following sections will discuss each Druid component in greater
|
||||||
|
detail.
|
||||||
|
|
||||||
|
\subsection{Apache Zookeeper}
|
||||||
|
Zookeeper is a service for coordinating processes of distributed
|
||||||
|
applications. Zookeeper provides connecting applications an
|
||||||
|
abstraction of a hierarchy of data nodes known as znodes. Each znode
|
||||||
|
is part of a hierarchical namespace, similar to file
|
||||||
|
systems. Zookeeper has the concept of ephemeral and permanent
|
||||||
|
znodes. Permanent nodes must be created and destroyed explicitly by a
|
||||||
|
connecting application. Ephemeral znodes can be created by connecting
|
||||||
|
applications and deleted either explicitly or if the session that
|
||||||
|
created the znode is terminated (such as in the event of service
|
||||||
|
failure).
|
||||||
|
|
||||||
|
\subsection{Historical Compute Nodes}
|
||||||
|
Historical compute nodes are the main workers of a Druid cluster and
|
||||||
|
are self-contained and self-sufficient. Compute nodes load historical
|
||||||
|
segments from permanent/deep storage and expose them for
|
||||||
|
querying. There is no single point of contention between the nodes and
|
||||||
|
nodes have no knowledge of one another. Compute nodes are
|
||||||
|
operationally simple; they only know how to perform the tasks they are
|
||||||
|
assigned. To help other services discover compute nodes and the data
|
||||||
|
they hold, every compute node maintains a constant Zookeeper
|
||||||
|
connection. Compute nodes announce their online state and the segments
|
||||||
|
they serve by creating ephemeral nodes under specifically configured
|
||||||
|
Zookeeper paths. Instructions for a given compute node to load new
|
||||||
|
segments or drop existing segments are sent by creating ephemeral
|
||||||
|
znodes under a special “load queue” path associated with the compute
|
||||||
|
node.
|
||||||
|
|
||||||
|
To expose a segment for querying, a compute node must first possess a
|
||||||
|
local copy of the segment. Before a compute node downloads a segment
|
||||||
|
from deep storage, it first checks a local disk directory (cache) to
|
||||||
|
see if the segment already exists in local storage. If no cache
|
||||||
|
information about the segment is present, the compute node will
|
||||||
|
download metadata about the segment from Zookeeper. This metadata
|
||||||
|
includes information about where the segment is located in deep
|
||||||
|
storage and about how to decompress and process the segment. Once a
|
||||||
|
compute node completes processing a segment, the node announces (in
|
||||||
|
Zookeeper) that it is serving the segment. At this point, the segment
|
||||||
|
is queryable.
|
||||||
|
|
||||||
|
\subsubsection{Tiers}
|
||||||
|
\label{sec:tiers}
|
||||||
|
Compute nodes can be grouped in different tiers, where all nodes in a
|
||||||
|
given tier are identically configured. Different performance and
|
||||||
|
fault-tolerance parameters can be set for each tier. The purpose of
|
||||||
|
tiered nodes is to enable higher or lower priority segments to be
|
||||||
|
distributed according to their importance. For example, it is possible
|
||||||
|
to spin up a “hot” tier of compute nodes that have a high number of
|
||||||
|
cores and a large RAM capacity. The “hot” cluster can be configured to
|
||||||
|
download more frequently accessed segments. A parallel “cold” cluster
|
||||||
|
can also be created with much less powerful backing hardware. The
|
||||||
|
“cold” cluster would only contain less frequently accessed segments.
|
||||||
|
|
||||||
|
\subsection{Real-time Nodes}
|
||||||
|
\label{sec:realtime}
|
||||||
|
Real-time nodes encapsulate the functionality to ingest and query
|
||||||
|
real-time data streams. Data indexed via these nodes is immediately
|
||||||
|
available for querying. Real-time nodes are a consumer of data and
|
||||||
|
require a corresponding producer to provide the data
|
||||||
|
stream. Typically, for data durability purposes, a message bus such as
|
||||||
|
Kafka \cite{kreps2011kafka} sits between the producer and the real-time node as shown
|
||||||
|
in Figure~\ref{fig:data-ingestion}.
|
||||||
|
|
||||||
|
\begin{figure}
|
||||||
|
\centering
|
||||||
|
\includegraphics[width = 3in]{druid_message_bus}
|
||||||
|
\caption{Real-time data ingestion.}
|
||||||
|
\label{fig:data-ingestion}
|
||||||
|
\end{figure}
|
||||||
|
|
||||||
|
The purpose of the message bus in Figure~\ref{fig:data-ingestion} is to act as a buffer for
|
||||||
|
incoming events. The message bus can maintain offsets indicating the
|
||||||
|
position in an event stream that a real-time node has read up to and
|
||||||
|
real-time nodes can update these offsets periodically. The message bus also acts as a backup storage for recent events.
|
||||||
|
Real-time nodes ingest data by reading events from the message bus. The time from event creation to message bus storage to
|
||||||
|
event consumption is on the order of hundreds of milliseconds.
|
||||||
|
|
||||||
|
Real-time nodes maintain an in-memory index for all incoming
|
||||||
|
events. These indexes are incrementally populated as new events appear on the message bus. The indexes are also directly queryable.
|
||||||
|
Real-time nodes persist their indexes to disk either periodically or after some maximum row limit is
|
||||||
|
reached. After each persist, a real-time node updates the message bus
|
||||||
|
with the offset of the last event of the most recently persisted
|
||||||
|
index. Each persisted index is immutable. If a real-time node fails and recovers, it can simply reload
|
||||||
|
any indexes that were persisted to disk and continue reading the
|
||||||
|
message bus from the point the last offset was committed. Periodically committing offsets reduces the number of messages a real-time
|
||||||
|
node has to rescan after a failure scenario.
|
||||||
|
|
||||||
|
Real-time nodes maintain a consolidated view of the currently updating
|
||||||
|
index and of all indexes persisted to disk. This unified view allows
|
||||||
|
all indexes on a node to be queried. On a periodic basis, the nodes will
|
||||||
|
schedule a background task that searches for all persisted indexes of
|
||||||
|
a data source. The task merges these indexes together and builds a
|
||||||
|
historical segment. The nodes will upload the segment to deep storage
|
||||||
|
and provide a signal for the historical compute nodes to begin serving
|
||||||
|
the segment. The ingest, persist, merge, and handoff steps are fluid;
|
||||||
|
there is no data loss as a real-time node converts a real-time segment
|
||||||
|
to a historical one. Figure~\ref{fig:data-durability} illustrates the process.
|
||||||
|
|
||||||
|
\begin{figure}
|
||||||
|
\centering
|
||||||
|
\includegraphics[width = 3in]{druid_realtime_flow}
|
||||||
|
\caption{Real-time data durability}
|
||||||
|
\label{fig:data-durability}
|
||||||
|
\end{figure}
|
||||||
|
|
||||||
|
Similar to compute nodes, real-time nodes announce segments in
|
||||||
|
Zookeeper. Unlike historical segments, real-time segments may
|
||||||
|
represent a period of time that extends into the future. For example,
|
||||||
|
a real-time node may announce it is serving a segment that contains
|
||||||
|
data for the current hour. Before the end of the hour, the real-time
|
||||||
|
node continues to collect data for the hour. Every 10 minutes (the
|
||||||
|
persist period is configurable), the node will flush and persist its
|
||||||
|
in-memory index to disk. At the end of the current hour, the real-time
|
||||||
|
node prepares to serve data for the next hour by creating a new index
|
||||||
|
and announcing a new segment for the next hour. The node does not
|
||||||
|
immediately merge and build a historical segment for the previous hour
|
||||||
|
until after some window period has passed. Having a window period
|
||||||
|
allows for straggling data points to come in and minimizes the risk of
|
||||||
|
data loss. At the end of the window period, the real-time node will
|
||||||
|
merge all persisted indexes, build a historical segment for the
|
||||||
|
previous hour, and hand the segment off to historical nodes to
|
||||||
|
serve. Once the segment is queryable on the historical nodes, the
|
||||||
|
real-time node flushes all information about the segment and
|
||||||
|
unannounces it is serving the segment.
|
||||||
|
|
||||||
|
Real-time nodes are highly scalable. If the data volume and ingestion
|
||||||
|
rates for a given data source exceed the maximum capabilities of a
|
||||||
|
single node, additional nodes can be added. Multiple nodes can
|
||||||
|
consume events from the same stream, and every individual node only
|
||||||
|
holds a portion of the total number of events. This creates natural
|
||||||
|
partitions across nodes. Each node announces the real-time segment it
|
||||||
|
is serving and each real-time segment has a partition number. Data
|
||||||
|
from individual nodes will be merged at the Broker level. To our
|
||||||
|
knowledge, the largest production level real-time Druid cluster is
|
||||||
|
consuming approximately 2 TB of raw data per hour.
|
||||||
|
|
||||||
|
\subsection{Broker Nodes}
|
||||||
|
Broker nodes act as query routers to other queryable nodes such as
|
||||||
|
compute and real-time nodes. Broker nodes understand the metadata
|
||||||
|
published in Zookeeper about what segments exist and on what nodes the
|
||||||
|
segments are stored. Broker nodes route incoming queries such that the queries hit
|
||||||
|
the right storage nodes. Broker nodes also merge partial results from
|
||||||
|
storage nodes before returning a final consolidated result to the
|
||||||
|
caller. Additionally, brokers provide an extra level of data
|
||||||
|
durability as they maintain a cache of recent results. In the event
|
||||||
|
that multiple storage nodes fail and all copies of a segment are
|
||||||
|
somehow lost, it is still possible that segment results can still be
|
||||||
|
returned if that information exists in the cache.
|
||||||
|
|
||||||
|
\subsubsection{Timeline}
|
||||||
|
To determine the correct nodes to forward queries to, Broker nodes
|
||||||
|
first build a view of the world from information in Zookeeper. Recall
|
||||||
|
that Druid uses Zookeeper to maintain information about all compute
|
||||||
|
and real-time nodes in a cluster and the segments those nodes are
|
||||||
|
serving. For every data source in Zookeeper, the Broker node builds a
|
||||||
|
timeline of segments for the data source and the nodes that serve them. A timeline
|
||||||
|
consists of segments and represents which segments contain data for
|
||||||
|
what ranges of time. Druid may have multiple segments where the data
|
||||||
|
source and interval are the same but versions differ. The timeline
|
||||||
|
view will always surface segments with the most recent version
|
||||||
|
identifier for a time range. If two segments intervals overlap, the segment with the more recent
|
||||||
|
version always has precedence. When queries are received for a specific
|
||||||
|
data source and interval, the Broker node performs a lookup on the
|
||||||
|
timeline associated with the query data source for the query interval
|
||||||
|
and retrieves the segments that contain data for the query. The broker
|
||||||
|
node maps these segments to the storage nodes that serve them and
|
||||||
|
forwards the query down to the respective nodes.
|
||||||
|
|
||||||
|
|
||||||
|
\subsubsection{Caching}
|
||||||
|
\label{sec:caching}
|
||||||
|
Broker nodes employ a distributed cache with a LRU \cite{o1993lru,
|
||||||
|
kim2001lrfu} cache invalidation strategy. The broker cache stores
|
||||||
|
per segment results. The cache can be local to each broker node or
|
||||||
|
shared across multiple nodes using an external distributed cache
|
||||||
|
such as memcached \cite{fitzpatrick2004distributed}. Recall that each time a broker node receives a
|
||||||
|
query, it first maps the query to a set of segments. A subset of
|
||||||
|
these segment results may already exist in the cache and the results
|
||||||
|
can be directly pulled from the cache. For any segment results that
|
||||||
|
do not exist in the cache, the broker node will forward the query
|
||||||
|
to the compute nodes. Once the compute nodes return their results,
|
||||||
|
the broker will store those results in the cache. Real-time segments
|
||||||
|
are never cached and hence requests for real-time data will always
|
||||||
|
be forwarded to real-time nodes. Real-time data is perpetually
|
||||||
|
changing and caching the results would be unreliable.
|
||||||
|
|
||||||
|
|
||||||
|
\subsection{Coordination (Master) Nodes}
|
||||||
|
The Druid coordination or master nodes are primarily in charge of
|
||||||
|
segment management and distribution. The Druid master is responsible
|
||||||
|
for loading new segments, dropping outdated segments, managing segment
|
||||||
|
replication, and balancing segment load. Druid uses a multi-version
|
||||||
|
concurrency control swapping protocol for managing segments in order
|
||||||
|
to maintain stable views.
|
||||||
|
|
||||||
|
The Druid master runs periodically to determine the current state of
|
||||||
|
the cluster. It makes decisions by comparing the expected state of the
|
||||||
|
cluster with the actual state of the cluster at the time of the
|
||||||
|
run. As with all Druid nodes, the Druid master maintains a connection
|
||||||
|
to Zookeeper for current cluster information. The master also
|
||||||
|
maintains a connection to a MySQL database that contains additional
|
||||||
|
operational parameters and configurations. One of the key pieces of
|
||||||
|
information located in the MySQL database is a segment table that
|
||||||
|
contains a list of historical segments that should be served. This
|
||||||
|
table can be updated by any service that creates historical
|
||||||
|
segments. The MySQL database also contains a rule table that governs
|
||||||
|
how segments are created, destroyed, and replicated in the cluster.
|
||||||
|
|
||||||
|
The master does not directly communicate with a compute node when
|
||||||
|
assigning it work; instead the master creates an ephemeral znode in
|
||||||
|
Zookeeper containing information about what the compute node should
|
||||||
|
do. The compute node maintains a similar connection to Zookeeper to
|
||||||
|
monitor for new work.
|
||||||
|
|
||||||
|
\subsubsection{Rules}
|
||||||
|
Historical segments are loaded and dropped from the cluster based on a
|
||||||
|
set of rules. Rules indicate how segments should be assigned to
|
||||||
|
different compute node tiers and how many replicates of a segment
|
||||||
|
should exist in each tier. Rules may also indicate when segments
|
||||||
|
should be dropped entirely from the cluster. The master loads a set of
|
||||||
|
rules from a rule table in the MySQL database. Rules may be specific
|
||||||
|
to a certain data source and/or a default set of rules can be
|
||||||
|
configured. The master will cycle through all available segments and
|
||||||
|
match each segment with the first rule that applies to it.
|
||||||
|
|
||||||
|
\subsubsection{Load Balancing}
|
||||||
|
In a typical production environment, queries often hit dozens or even
|
||||||
|
hundreds of data segments. Since each compute node has limited
|
||||||
|
resources, historical segments must be distributed among the cluster
|
||||||
|
to ensure that the cluster load is not too imbalanced. Determining
|
||||||
|
optimal load distribution requires some knowledge about query patterns
|
||||||
|
and speeds. Typically, queries cover recent data spanning contiguous
|
||||||
|
time intervals for a single data source. On average, queries that
|
||||||
|
access smaller segments are faster.
|
||||||
|
|
||||||
|
These query patterns suggest replicating recent historical segments at
|
||||||
|
a higher rate, spreading out large segments that are close in time to
|
||||||
|
different compute nodes, and co-locating segments from different data
|
||||||
|
sources. To optimally distribute and balance segments among the
|
||||||
|
cluster, we developed a cost-based optimization procedure that takes
|
||||||
|
into account the segment data source, recency, and size. The exact
|
||||||
|
details of the algorithm are beyond the scope of this paper.
|
||||||
|
|
||||||
|
\section{Query API}
|
||||||
|
\label{sec:query-api}
|
||||||
|
Queries to Druid are made in the form of POST requests. All queryable
|
||||||
|
Druid nodes share the same query API. The body of the POST request is
|
||||||
|
a JSON object containing key-value pairs specifying various query
|
||||||
|
parameters. A typical query will contain the data source name, the
|
||||||
|
granularity of the result data, the time range of interest, and the
|
||||||
|
type of request. A sample time series query is shown below:
|
||||||
|
\begin{verbatim}
|
||||||
|
{
|
||||||
|
"queryType" : "timeseries",
|
||||||
|
"dataSource" : "sample_data",
|
||||||
|
"intervals" : "2013-01-01/2013-01-02",
|
||||||
|
"filter" : {
|
||||||
|
"type" : "selector",
|
||||||
|
"dimension" : "poets",
|
||||||
|
"value" : "Ke$ha"
|
||||||
|
},
|
||||||
|
"granularity" : "day",
|
||||||
|
"aggregations" : [
|
||||||
|
{
|
||||||
|
"type" : "count",
|
||||||
|
"fieldName" : "row",
|
||||||
|
"name" : "row"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
\end{verbatim}
|
||||||
|
|
||||||
|
Certain query types will also support a filter set. A filter set is an
|
||||||
|
arbitrary Boolean expression of dimension name and value
|
||||||
|
pairs. Support for complex nested filter sets enables flexibility and
|
||||||
|
provides the ability to deeply explore data.
|
||||||
|
|
||||||
|
The exact query syntax depends on the query type and the information requested.
|
||||||
|
It is beyond the scope of this paper to describe the Query API in full detail.
|
||||||
|
We are also in the process of building out SQL support for Druid.
|
||||||
|
|
||||||
|
\section{Storage}
|
||||||
|
\label{sec:storage}
|
||||||
|
Druid is a column-oriented data store. When considering aggregates
|
||||||
|
over a large number of events, the advantages storing data as columns
|
||||||
|
rather than rows are well documented \cite{cattell2011scalable}. Column storage allows for
|
||||||
|
more efficient CPU usage as only what is needed is actually loaded and
|
||||||
|
scanned. In a row oriented data store, all columns associated with a
|
||||||
|
row must be scanned as part of an aggregation. The additional scan
|
||||||
|
time can introduce performance degradations as high as 250\% \cite{bear2012vertica}.
|
||||||
|
|
||||||
|
\subsection{Column Types}
|
||||||
|
Druid has multiple column types to represent the various column value
|
||||||
|
formats. Depending on the column type, different compression methods
|
||||||
|
are used to reduce the cost of storing a column in memory and on
|
||||||
|
disk. In the example given in Table~\ref{tab:sample_data}, the
|
||||||
|
publisher, advertiser, gender, and country columns only contain
|
||||||
|
strings. String columns can be dictionary encoded. Dictionary encoding
|
||||||
|
is a common method to compress data and has been used in other data
|
||||||
|
stores such as Powerdrill \cite{hall2012processing}. In the example in
|
||||||
|
Table~\ref{tab:sample_data}, we can map each publisher to an unique
|
||||||
|
integer identifier.
|
||||||
|
\begin{verbatim}
|
||||||
|
bieberfever.com -> 0
|
||||||
|
ultratrimfast.com -> 1
|
||||||
|
\end{verbatim}
|
||||||
|
This mapping allows us to represent the publisher column as an integer
|
||||||
|
array where the array indices correspond to the rows of the original
|
||||||
|
data set. For the publisher column, we can represent the unique
|
||||||
|
publishers as follows:
|
||||||
|
\begin{verbatim}
|
||||||
|
[0, 0, 1, 1]
|
||||||
|
\end{verbatim}
|
||||||
|
|
||||||
|
The resulting integer array lends itself very well to
|
||||||
|
compression. Generic compression algorithms on top of encodings are
|
||||||
|
very common in column-stores. We opted to use the LZF \cite{liblzf2013} compression
|
||||||
|
algorithm.
|
||||||
|
|
||||||
|
We can leverage similar compression algorithms for numeric
|
||||||
|
columns. For example, the clicks and revenue columns in
|
||||||
|
Table~\ref{tab:sample_data} can also be expressed as individual
|
||||||
|
arrays.
|
||||||
|
\begin{verbatim}
|
||||||
|
Clicks -> [25, 42, 17, 170]
|
||||||
|
Revenue -> [15.70, 29.18, 17.31, 34.01]
|
||||||
|
\end{verbatim}
|
||||||
|
In this case we compress the raw values as opposed to their dictionary
|
||||||
|
representations.
|
||||||
|
|
||||||
|
\subsection{Filters}
|
||||||
|
To support arbitrary filter sets, Druid creates additional lookup
|
||||||
|
indices for string columns. These lookup indices are compressed and
|
||||||
|
Druid operates over the indices in their compressed form. Filters can
|
||||||
|
be expressed as Boolean equations of multiple lookup indices. Boolean
|
||||||
|
operations of indices in their compressed form is both performance and
|
||||||
|
space efficient.
|
||||||
|
|
||||||
|
Let us consider the publisher column in
|
||||||
|
Table~\ref{tab:sample_data}. For each unique publisher in
|
||||||
|
Table~\ref{tab:sample_data}, we can form some representation
|
||||||
|
indicating which table rows a particular publisher is seen. We can
|
||||||
|
store this information in a binary array where the array indices
|
||||||
|
represent our rows. If a particular publisher is seen in a certain
|
||||||
|
row, that array index is marked as \texttt{1}. For example:
|
||||||
|
\begin{verbatim}
|
||||||
|
bieberfever.com -> rows [0, 1] -> [1][1][0][0]
|
||||||
|
ultratrimfast.com -> rows [2, 3] -> [0][0][1][1]
|
||||||
|
\end{verbatim}
|
||||||
|
|
||||||
|
\texttt{\href{http://bieberfever.com}{bieberfever.com}} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values
|
||||||
|
to row indices forms an inverted index \cite{tomasic1993performance}. If we want to know which
|
||||||
|
rows contain {\ttfamily bieberfever.com} or {\ttfamily ultratrimfast.com}, we can \texttt{OR} together
|
||||||
|
the \texttt{bieberfever.com} and \texttt{ultratrimfast.com} arrays.
|
||||||
|
\begin{verbatim}
|
||||||
|
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
|
||||||
|
\end{verbatim}
|
||||||
|
|
||||||
|
\begin{figure}
|
||||||
|
\centering
|
||||||
|
\includegraphics[width = 3in]{concise_plot}
|
||||||
|
\caption{Integer array size versus CONCISE set size.}
|
||||||
|
\label{fig:concise_plot}
|
||||||
|
\end{figure}
|
||||||
|
|
||||||
|
This approach of performing Boolean operations on large bitmap sets is
|
||||||
|
commonly used in search engines. Bitmap compression algorithms are a
|
||||||
|
well-defined area of research and often utilize run-length
|
||||||
|
encoding. Well known algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte},
|
||||||
|
Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned Word-Aligned
|
||||||
|
Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use the CONCISE
|
||||||
|
algorithm \cite{colantonio2010concise} as it can outperform WAH by reducing the size of the
|
||||||
|
compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot} illustrates the number of
|
||||||
|
bytes using CONCISE compression versus using an integer array. The
|
||||||
|
results were generated on a cc2.8xlarge system with a single thread,
|
||||||
|
2G heap, 512m young gen, and a forced GC between each run. The data
|
||||||
|
set is a single day’s worth of data collected from the Twitter garden
|
||||||
|
hose \cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
|
||||||
|
dimensions of varying cardinality. As an additional comparison, we
|
||||||
|
also resorted the data set rows to maximize compression.
|
||||||
|
|
||||||
|
In the unsorted case, the total CONCISE compressed size was 53,451,144
|
||||||
|
bytes and the total integer array size was 127,248,520 bytes. Overall,
|
||||||
|
CONCISE compressed sets are about 42\% smaller than integer arrays.
|
||||||
|
In the sorted case, the total CONCISE compressed size was 43,832,884
|
||||||
|
bytes and the total integer array size was 127,248,520 bytes. What is
|
||||||
|
interesting to note is that after sorting, global compression only
|
||||||
|
increased minimally. The total CONCISE set size to total integer array
|
||||||
|
size is 34\%. It is also interesting to note that as the cardinality
|
||||||
|
of a dimension approaches the total number of rows in a data set,
|
||||||
|
integer arrays actually require less space than CONCISE sets.
|
||||||
|
|
||||||
|
\subsection{Storage Engine}
|
||||||
|
Druid’s persistence components allows for different storage engines to
|
||||||
|
be plugged in, similar to Dynamo \cite{decandia2007dynamo}. These storage engines may store
|
||||||
|
data in in-memory structures such as the JVM heap or in memory mapped
|
||||||
|
structures. The ability to swap storage engines allows for Druid to be
|
||||||
|
configured depending on a particular application’s specifications. An
|
||||||
|
in-memory storage engine may be operationally more expensive than a
|
||||||
|
memory-mapped storage engine but could be a better alternative if
|
||||||
|
performance is critical. At Metamarkets, we commonly use a
|
||||||
|
memory-mapped storage engine.
|
||||||
|
|
||||||
|
\section{Robustness and Fault-Tolerance}
|
||||||
|
\label{sec:robustness}
|
||||||
|
To achieve high system availability and data durability, Druid employs
|
||||||
|
several fault recovery techniques. Druid has no single point of
|
||||||
|
failure.
|
||||||
|
|
||||||
|
\subsection{Replication}
|
||||||
|
Druid replicates historical segments on multiple hosts. The number of
|
||||||
|
replicates in each tier of the historical compute cluster is fully
|
||||||
|
configurable. Setups that require high levels of fault tolerance can
|
||||||
|
be configured to have a high number of replicates. Replicates are
|
||||||
|
assigned to compute nodes by coordination nodes using the same load
|
||||||
|
distribution algorithm discussed in Section~\ref{sec:caching}. Conceptually,
|
||||||
|
broker nodes do not distinguish historical segments from their
|
||||||
|
replicates. Broker nodes forward queries to the first node it finds
|
||||||
|
that contains data for the query.
|
||||||
|
|
||||||
|
Real-time segments follow a different replication model as real-time
|
||||||
|
segments are mutable. Recall that real-time nodes act as consumers of
|
||||||
|
a data stream. Multiple real-time nodes can read from the same message
|
||||||
|
bus if each maintains a unique offset, hence creating multiple copies
|
||||||
|
of a real-time segment. If a real-time node fails and recovers, it can
|
||||||
|
reload any indexes that were persisted to disk and read from the
|
||||||
|
message bus from the point it last committed an offset.
|
||||||
|
|
||||||
|
\subsection{Local Segment Cache}
|
||||||
|
Recall that each Druid compute node maintains a local cache of
|
||||||
|
historical segments it recently served. A compute node also has a
|
||||||
|
lookup table for segments it has in its cache and stores this lookup
|
||||||
|
table on disk. When a compute node is assigned a new segment to load,
|
||||||
|
the compute node will first check its local segment cache directory to
|
||||||
|
see if the segment had been previously downloaded. If a cache entry
|
||||||
|
exists, the compute node will directly read the segment binary files
|
||||||
|
and load the segment.
|
||||||
|
|
||||||
|
The segment cache is also leveraged when a compute node is initially
|
||||||
|
started. During startup, a compute node will first read its lookup
|
||||||
|
table to determine what segments it has cached. All cached segments
|
||||||
|
are immediately loaded and served. This feature introduces minimal
|
||||||
|
overhead and allows a compute node to readily serve data as soon as it
|
||||||
|
comes online. By making data quickly available on startup and
|
||||||
|
minimizing initial startup time, compute nodes that become
|
||||||
|
inexplicably disconnected from the cluster can reconnect themselves
|
||||||
|
seamlessly. This also means that software updates can be pushed to
|
||||||
|
compute nodes without disruption to cluster operations. In practice, a
|
||||||
|
software update to a compute node can be completed before coordination
|
||||||
|
nodes even notice that the compute node has disconnected. At
|
||||||
|
Metamarkets, we update Druid through rolling restarts. Compute nodes
|
||||||
|
are updated one at a time and we experience no downtime or data loss
|
||||||
|
through the update process.
|
||||||
|
|
||||||
|
\subsection{Failure Detection}
|
||||||
|
If a compute node completely fails and becomes unavailable, the
|
||||||
|
ephemeral Zookeeper znodes it created are deleted. The master node
|
||||||
|
will notice that certain segments are insufficiently replicated or
|
||||||
|
missing altogether. Additional replicates will be created and
|
||||||
|
redistributed throughout the cluster.
|
||||||
|
|
||||||
|
We are moving towards building out infrastructure to support
|
||||||
|
programmatic creation of real-time nodes. In the near future, the
|
||||||
|
master node will also notice if real-time segments are insufficiently
|
||||||
|
replicated and automatically create additional real-time nodes as
|
||||||
|
redundant backups.
|
||||||
|
|
||||||
|
\subsection{Adding and Removing Nodes}
|
||||||
|
Starting and removing Druid nodes is a relatively simple process; all
|
||||||
|
that is required is to start and stop Java processes. There is minimal
|
||||||
|
operational overhead with adding nodes in batches. Scaling down the
|
||||||
|
cluster is usually done one node at a time with some time lapse
|
||||||
|
between shutdowns. This allows the master to have ample time to
|
||||||
|
redistribute load and create additional replicates. Shutting down
|
||||||
|
nodes in batches is generally not supported as it may destroy all
|
||||||
|
copies of a segment, which would lead to data loss.
|
||||||
|
|
||||||
|
\section{Performance Benchmarks}
|
||||||
|
\label{sec:benchmarks}
|
||||||
|
To benchmark Druid performance, we created a large test cluster with
|
||||||
|
6TB of uncompressed data, representing tens of billions of fact
|
||||||
|
rows. The data set contained more than a dozen dimensions, with
|
||||||
|
cardinalities ranging from the double digits to tens of millions. We computed
|
||||||
|
four metrics for each row (counts, sums, and averages). The data was
|
||||||
|
sharded first on timestamp then on dimension values, creating
|
||||||
|
thousands of shards roughly 8 million fact rows apiece.
|
||||||
|
|
||||||
|
The cluster used in the benchmark consisted of 100 historical compute
|
||||||
|
nodes, each with 16 cores, 60GB of RAM, 10 GigE Ethernet, and 1TB of
|
||||||
|
disk space. Collectively, the cluster comprised of 1600 cores, 6TB or
|
||||||
|
RAM, fast Ethernet and more than enough disk space.
|
||||||
|
|
||||||
|
SQL statements are included in Table~\ref{tab:sql_queries} to describe the
|
||||||
|
purpose of each of the queries. Please note:
|
||||||
|
\begin{itemize}
|
||||||
|
\item The timestamp range of the queries encompassed all data.
|
||||||
|
\item Each machine was a 16-core machine with 60GB RAM and 1TB of local
|
||||||
|
disk. The machine was configured to only use 15 threads for
|
||||||
|
processing queries.
|
||||||
|
\item A memory-mapped storage engine was used (the machine was configured to memory map the data
|
||||||
|
instead of loading it into the Java heap.)
|
||||||
|
\end{itemize}
|
||||||
|
|
||||||
|
\begin{figure}
|
||||||
|
\centering
|
||||||
|
\includegraphics[width = 3in]{cluster_scan_rate}
|
||||||
|
\caption{Druid cluster scan rate with lines indicating linear scaling
|
||||||
|
from 25 nodes.}
|
||||||
|
\label{fig:cluster_scan_rate}
|
||||||
|
\end{figure}
|
||||||
|
|
||||||
|
\begin{figure}
|
||||||
|
\centering
|
||||||
|
\includegraphics[width = 3in]{core_scan_rate}
|
||||||
|
\caption{Druid core scan rate.}
|
||||||
|
\label{fig:core_scan_rate}
|
||||||
|
\end{figure}
|
||||||
|
|
||||||
|
\begin{table*}
|
||||||
|
\centering
|
||||||
|
\caption{Druid Queries}
|
||||||
|
\label{tab:sql_queries}
|
||||||
|
\begin{verbatim}
|
||||||
|
1 SELECT count(*) FROM _table_ WHERE timestamp >= ? AND timestamp < ?
|
||||||
|
|
||||||
|
2 SELECT count(*), sum(metric1) FROM _table_ WHERE timestamp >= ? AND timestamp < ?
|
||||||
|
|
||||||
|
3 SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM _table_
|
||||||
|
WHERE timestamp >= ? AND timestamp < ?
|
||||||
|
|
||||||
|
4 SELECT high_card_dimension, count(*) AS cnt FROM _table_
|
||||||
|
WHERE timestamp >= ? AND timestamp < ?
|
||||||
|
GROUP BY high_card_dimension ORDER BY cnt limit 100
|
||||||
|
|
||||||
|
5 SELECT high_card_dimension, count(*) AS cnt, sum(metric1) FROM _table_
|
||||||
|
WHERE timestamp >= ? AND timestamp < ?
|
||||||
|
GROUP BY high_card_dimension ORDER BY cnt limit 100
|
||||||
|
|
||||||
|
6 SELECT high_card_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4)
|
||||||
|
FROM _table_ WHERE timestamp >= ? AND timestamp < ?
|
||||||
|
GROUP BY high_card_dimension ORDER BY cnt limit 100
|
||||||
|
\end{verbatim}
|
||||||
|
\end{table*}
|
||||||
|
|
||||||
|
%\begin{enumerate}
|
||||||
|
%\item \texttt{Select high\_card\_dimension, count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) AS cnt from \_table\_ where timestamp $\geq$ ? and timestamp $<$ ? group by high\_card\_dimension order by cnt limit 100;}
|
||||||
|
%\end{enumerate}
|
||||||
|
|
||||||
|
Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and
|
||||||
|
Figure~\ref{fig:core_scan_rate} shows the core scan rate. In
|
||||||
|
Figure~\ref{fig:cluster_scan_rate} we also include projected linear
|
||||||
|
scaling based on the results of the 25 core cluster. In particular,
|
||||||
|
we observe diminishing marginal returns to performance in the size of
|
||||||
|
the cluster. Under linear scaling, SQL query 1 would have achieved a
|
||||||
|
speed of 37 billion rows per second on our 75 node cluster. In fact,
|
||||||
|
the speed was 26 billion rows per second. The speed up of a parallel
|
||||||
|
computing system is often limited by the time needed for the
|
||||||
|
sequential operations of the system, in accordance with Amdahl's law
|
||||||
|
\cite{amdahl1967validity}.
|
||||||
|
|
||||||
|
The first query listed in Table~\ref{tab:sql_queries} is a simple
|
||||||
|
count, achieving scan rates of 33M rows/second/core. We believe
|
||||||
|
the 75 node cluster was actually overprovisioned for the test
|
||||||
|
dataset, explaining the modest improvement over the 50 node cluster.
|
||||||
|
Druid's concurrency model is based on shards: one thread will scan one
|
||||||
|
shard. If the number of segments on a compute node modulo the number
|
||||||
|
of cores is small (e.g. 17 segments and 15 cores), then many of the
|
||||||
|
cores will be idle during the last round of the computation.
|
||||||
|
|
||||||
|
When we include more aggregations we see performance degrade. This is
|
||||||
|
because of the column-oriented storage format Druid employs. For the
|
||||||
|
\texttt{count(*)} queries, Druid only has to check the timestamp column to satisfy
|
||||||
|
the ``where'' clause. As we add metrics, it has to also load those metric
|
||||||
|
values and scan over them, increasing the amount of memory scanned.
|
||||||
|
|
||||||
|
\section{Related Work}
|
||||||
|
\label{sec:related}
|
||||||
|
Cattell \cite{cattell2011scalable} maintains a great summary about existing Scalable SQL and
|
||||||
|
NoSQL data stores. In the landscape of distributed data stores, Druid
|
||||||
|
feature-wise sits somewhere between Google’s Dremel \cite{melnik2010dremel} and Powerdrill
|
||||||
|
\cite{hall2012processing}. Druid has most of the features implemented in Dremel (Dremel
|
||||||
|
handles arbitrary nested data structures while Druid only allows for a
|
||||||
|
single level of array-based nesting) and many of the interesting
|
||||||
|
compression algorithms mentioned in Powerdrill.
|
||||||
|
|
||||||
|
Although Druid builds on many of the same principles as other
|
||||||
|
distributed column-oriented data stores \cite{fink2012distributed}, most existing data
|
||||||
|
stores are designed to be key-value stores \cite{lerner2010redis}, or
|
||||||
|
document/extensible record stores \cite{stonebraker2005c}. Such data stores are great
|
||||||
|
solutions for traditional data warehouse needs and general
|
||||||
|
back-office/reporting usage. Typically, analysts will query these data
|
||||||
|
stores and build reports from the results. In-memory databases such as
|
||||||
|
SAP’s HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb} are examples of other data stores that
|
||||||
|
are highly suited for traditional data warehousing needs. Druid is a
|
||||||
|
front-office system designed such that user-facing dashboards can be
|
||||||
|
built on top of it. Similar to \cite{paraccel2013}, Druid has analytical features
|
||||||
|
built in. The main features Druid offers over traditional data
|
||||||
|
warehousing solutions are real-time data ingestion, interactive
|
||||||
|
queries and interactive query latencies. In terms of real-time
|
||||||
|
ingestion and processing of data, Trident/Storm \cite{marz2013storm} and Streaming
|
||||||
|
Spark \cite{zaharia2012discretized} are other popular real-time computation systems, although
|
||||||
|
they lack the data storage capabilities of Druid. Spark/Shark \cite{engle2012shark} are
|
||||||
|
also doing similar work in the area of fast data analysis on large
|
||||||
|
scale data sets. Cloudera Impala \cite{cloudera2013} is another system focused on
|
||||||
|
optimizing querying performance, but more so in Hadoop environments.
|
||||||
|
|
||||||
|
Druid leverages a unique combination of algorithms in its
|
||||||
|
architecture. Although we believe no other data store has the same set
|
||||||
|
of functionality as Druid, some of Druid’s features such as using
|
||||||
|
inverted indices to perform faster filters also exist in other data
|
||||||
|
stores \cite{macnicol2004sybase}.
|
||||||
|
|
||||||
|
\section{Conclusions}
|
||||||
|
\label{sec:conclusions}
|
||||||
|
In this paper, we presented Druid, a distributed, column-oriented,
|
||||||
|
real-time analytical data store. Druid is a highly customizable
|
||||||
|
solution that is optimized for fast query latencies. Druid ingests
|
||||||
|
data in real-time and is fault-tolerant. We discussed the performance
|
||||||
|
of Druid on billion row data sets. We summarized key Druid architecture
|
||||||
|
aspects such as the storage format, query language and general
|
||||||
|
execution. In the future, we plan to cover more in depth the different
|
||||||
|
algorithms we’ve developed for Druid and how other systems may plug
|
||||||
|
into Druid to achieve powerful use cases.
|
||||||
|
|
||||||
|
\section{Acknowledgements}
|
||||||
|
\label{sec:acknowledgements}
|
||||||
|
We want to thank Steve Harris and Jaypal Sethi for their feedback on improving this paper.
|
||||||
|
We also want to give recognition to Adam Smith; without him the first Metamarkets hackathon would not have
|
||||||
|
been organized and this paper would not have been created. Another special recognition to Katherine Chu for
|
||||||
|
helping to create all the images in this paper.
|
||||||
|
|
||||||
|
Druid could not have been built without the help of many great
|
||||||
|
engineers at Metamarkets and in the community. We want to thank Danny Yuan, Jae Hyeon Bae, Paul Baclace, Dave
|
||||||
|
Nielsen, and Dhruv Parthasarathy for their
|
||||||
|
contributions to Druid.
|
||||||
|
|
||||||
|
% The following two commands are all you need in the
|
||||||
|
% initial runs of your .tex file to
|
||||||
|
% produce the bibliography for the citations in your paper.
|
||||||
|
\bibliographystyle{abbrv}
|
||||||
|
\bibliography{druid} % druid.bib is the name of the Bibliography in this case
|
||||||
|
% You must have a proper ".bib" file
|
||||||
|
% and remember to run:
|
||||||
|
% latex bibtex latex latex
|
||||||
|
% to resolve all references
|
||||||
|
|
||||||
|
%Generated by bibtex from your ~.bib file. Run latex,
|
||||||
|
%then bibtex, then latex twice (to resolve references).
|
||||||
|
|
||||||
|
%APPENDIX is optional.
|
||||||
|
% ****************** APPENDIX **************************************
|
||||||
|
% Example of an appendix; typically would start on a new page
|
||||||
|
%pagebreak
|
||||||
|
|
||||||
|
\end{document}
|
Loading…
Reference in New Issue