diff --git a/publications/vldb/druid.pdf b/publications/vldb/druid.pdf deleted file mode 100644 index f639bee76f1..00000000000 Binary files a/publications/vldb/druid.pdf and /dev/null differ diff --git a/publications/vldb/druid.tex b/publications/vldb/druid.tex deleted file mode 100644 index 981fb4dc9f9..00000000000 --- a/publications/vldb/druid.tex +++ /dev/null @@ -1,955 +0,0 @@ -% THIS IS AN EXAMPLE DOCUMENT FOR VLDB 2012 -% based on ACM SIGPROC-SP.TEX VERSION 2.7 -% Modified by Gerald Weber -% Removed the requirement to include *bbl file in here. (AhmetSacan, Sep2012) -% Fixed the equation on page 3 to prevent line overflow. (AhmetSacan, Sep2012) - -\documentclass{vldb} -\usepackage{graphicx} -\usepackage{balance} % for \balance command ON LAST PAGE (only there!) -\usepackage{fontspec} -\setmainfont[Ligatures={TeX}]{Times} -\usepackage{hyperref} -\graphicspath{{figures/}} - -\hyphenation{metamarkets nelson} - -\begin{document} - -% ****************** TITLE **************************************** - -\title{Druid: Real-time Analytical Data Store} - -% possible, but not really needed or used for PVLDB: -%\subtitle{[Extended Abstract] -%\titlenote{A full version of this paper is available as\textit{Author's Guide to Preparing ACM SIG Proceedings Using \LaTeX$2_\epsilon$\ and BibTeX} at \texttt{www.acm.org/eaddress.htm}}} - -% ****************** AUTHORS ************************************** - -% You need the command \numberofauthors to handle the 'placement -% and alignment' of the authors beneath the title. -% -% For aesthetic reasons, we recommend 'three authors at a time' -% i.e. three 'name/affiliation blocks' be placed beneath the title. -% -% NOTE: You are NOT restricted in how many 'rows' of -% "name/affiliations" may appear. We just ask that you restrict -% the number of 'columns' to three. -% -% Because of the available 'opening page real-estate' -% we ask you to refrain from putting more than six authors -% (two rows with three columns) beneath the article title. -% More than six makes the first-page appear very cluttered indeed. -% -% Use the \alignauthor commands to handle the names -% and affiliations for an 'aesthetic maximum' of six authors. -% Add names, affiliations, addresses for -% the seventh etc. author(s) as the argument for the -% \additionalauthors command. -% These 'additional authors' will be output/set for you -% without further effort on your part as the last section in -% the body of your article BEFORE References or any Appendices. - -\numberofauthors{7} % in this sample file, there are a *total* -% of EIGHT authors. SIX appear on the 'first-page' (for formatting -% reasons) and the remaining two appear in the \additionalauthors section. - -\author{ -% You can go ahead and credit any number of authors here, -% e.g. one 'row of three' or two rows (consisting of one row of three -% and a second row of one, two or three). -% -% The command \alignauthor (no curly braces needed) should -% precede each author name, affiliation/snail-mail address and -% e-mail address. Additionally, tag each line of -% affiliation/address with \affaddr, and tag the -% e-mail address with \email. -% -% 1st. author -\alignauthor Fangjin Yang\\ -\affaddr{Metamarkets, Inc.}\\ -\affaddr{625 2nd Street, Suite 230}\\ -\affaddr{San Francisco, CA, USA}\\ -\email{fangjin@metamarkets.com} -\alignauthor Eric Tschetter\\ -\affaddr{Metamarkets, Inc.}\\ -\affaddr{625 2nd Street, Suite 230}\\ -\affaddr{San Francisco, CA, USA}\\ -\email{eric@metamarkets.com} -\alignauthor Gian Merlino\\ -\affaddr{Metamarkets, Inc.}\\ -\affaddr{625 2nd Street, Suite 230}\\ -\affaddr{San Francisco, CA, USA}\\ -\email{gian@metamarkets.com} -\and -\alignauthor Nelson Ray\\ -\affaddr{Metamarkets, Inc.}\\ -\affaddr{625 2nd Street, Suite 230}\\ -\affaddr{San Francisco, CA, USA}\\ -\email{nelson@metamarkets.com} -\alignauthor Xavier Léauté\\ -\affaddr{Metamarkets, Inc.}\\ -\affaddr{625 2nd Street, Suite 230}\\ -\affaddr{San Francisco, CA, USA}\\ -\email{xavier@metamarkets.com} -\alignauthor Deep Ganguli\\ -\affaddr{Metamarkets, Inc.}\\ -\affaddr{625 2nd Street, Suite 230}\\ -\affaddr{San Francisco, CA, USA}\\ -\email{deep@metamarkets.com} -} -% There's nothing stopping you putting the seventh, eighth, etc. -% author on the opening page (as the 'third row') but we ask, -% for aesthetic reasons that you place these 'additional authors' -% in the \additional authors block, viz. -\additionalauthors{Michael Driscoll (Metamarkets, \texttt{mike@metamarkets.com})} -\date{21 March 2013} -% Just remember to make sure that the TOTAL number of authors -% is the number that will appear on the first page PLUS the -% number that will appear in the \additionalauthors section. - -\maketitle - -\begin{abstract} -Druid is a scalable, real-time analytical data store that supports -ad-hoc queries on large-scale data sets. The system combines a -columnar data layout, a shared-nothing architecture, and an advanced -indexing structure to allow arbitrary exploration of billion-row -tables with sub-second latencies. Druid scales horizontally and is the -core engine of the Metamarkets platform. In this paper, we detail the -architecture and implementation of Druid and describe how it solves -the real-time data ingestion problem. -\end{abstract} - -\section{Introduction} -In recent years, enterprises are facing ever-growing collections of -data in all forms. The scale of data has reached terabytes, even -petabytes of information per day. Companies are increasingly realizing -the importance of unlocking the insights contained within this -data. Numerous database vendors such as IBM’s Netezza \cite{singh2011introduction}, Vertica -\cite{bear2012vertica}, and EMC’s Greenplum \cite{miner2012unified} offer data warehousing solutions, and -several research papers \cite{barroso2009datacenter, -chaudhuri1997overview, dewitt1992parallel} directly address this problem as -well. As the interactive data exploration space becomes more mature, -it is apparent that real-time ingestion and exploration of data will -unlock business-critical decisions for front office and back office -analysts. - -Metamarkets realized early on that for high data volumes, existing -Relational Database Management Systems (RDBMS) and most NoSQL -architectures were not sufficient to address the performance and -use-case needs of the business intelligence space. Druid was built to -address a gap we believe exists in the current big-data ecosystem for -a real-time analytical data store. Druid is a distributed, columnar, -shared-nothing data store designed to reliably scale to petabytes of -data and thousands of cores. It is a highly available system designed -to run on commodity hardware with no downtime in the face of failures, -data imports or software updates. We have been building Druid over the -course of the last two years and it is the core engine of the -Metamarkets technology stack. - -In many ways, Druid shares similarities with other interactive query systems -\cite{melnik2010dremel}, main-memory databases \cite{farber2012sap}, and widely-known distributed data -stores such as BigTable \cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra \cite{lakshman2010cassandra}. Unlike -most traditional data stores, Druid operates mainly on read-only data -and has limited functionality for writes. The system is highly optimized -for large-scale transactional data aggregation and arbitrarily deep data exploration. Druid is highly configurable -and allows users to adjust levels of fault tolerance and -performance. - -Druid builds on the ideas of other distributed data stores, real-time -computation engines, and search engine indexing algorithms. In this -paper, we make the following contributions to academia: -\begin{itemize} -\item We outline Druid’s real-time ingestion and query capabilities -and explain how we can explore events within milliseconds of their -creation -\item We describe how the architecture allows for fast and flexible -queries and how inverted indices can be applied to quickly filter -data -\item We present experiments benchmarking Druid’s performance -\end{itemize} -The format of the paper is as follows: Section \ref{sec:data-model} describes the Druid -data model. Section \ref{sec:cluster} presents an overview of the components of a -Druid cluster. Section \ref{sec:query-api} outlines the query API. Section \ref{sec:storage} describes -data storage format in greater detail. Section \ref{sec:robustness} discusses Druid -robustness and failure responsiveness. Section \ref{sec:benchmarks} provides our -performance benchmarks. Section \ref{sec:related} lists the related work and Section \ref{sec:conclusions} presents -our conclusions. - -\section{Data Model} -\label{sec:data-model} -The fundamental storage unit in Druid is the segment. Distribution and -replication in Druid are always done at a segment level. Segments -encapsulate a partition of a larger transactional data set. To better -understand how a typical row/column collection of data translates into -a Druid segment, consider the data set shown in Table~\ref{tab:sample_data}. - -\begin{table*} -\centering -\caption{Sample Druid data} -\label{tab:sample_data} -\texttt{ - \begin{tabular}{ l l l l l l l l l } - \hline - \textbf{Timestamp} & \textbf{Publisher} & \textbf{Advertiser} & \textbf{Gender} & \textbf{Country} & \textbf{Impressions} & \textbf{Clicks} & \textbf{Revenue} \\ - 2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 1800 & 25 & 15.70 \\ - 2011-01-01T01:00:00Z & bieberfever.com & google.com & Male & USA & 2912 & 42 & 29.18 \\ - 2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 1953 & 17 & 17.31 \\ - 2011-01-01T02:00:00Z & ultratrimfast.com & google.com & Male & USA & 3194 & 170 & 34.01 \\ - \end{tabular} -} -\end{table*} - -A segment is composed of multiple binary files, each representing a -column of a data set. The data set in Table~\ref{tab:sample_data} consists of 8 distinct -columns, one of which is the timestamp column. Druid always requires a -timestamp column because it (currently) only operates with event-based -data. Segments always represent some time interval and each column -file contains the specific values for that column over the time -interval. Since segments always contain data for a time range, it is -logical that Druid partitions data into smaller chunks based on the -timestamp value. In other words, segments can be thought of as blocks -of data that represent a certain granularity of time. For example, if -we wanted to shard the data in Table~\ref{tab:sample_data} to an hourly granularity, the -partitioning algorithm would result in two segments, one representing -each hour of 2011-01-01. Similarly, if we sharded the data to a daily -granularity, we would create a single segment for 2011-01-01. - -Partitioning the data based on granularity buckets allows users to -fine tune the degree of parallelization they want in Druid. A data set -representing a year’s worth of data may be bucketed by day, and a data -set representing only a day’s worth of day may be partitioned by -hour. Sharding on a single dimension (time) may still result in segments that are too large to manage if the data volume is sufficiently high. -To create more operable partition chunks, Druid may -additionally shard data based on other factors such as dimension -cardinality. Each shard creates a segment and hence, segments are uniquely identified by a data source -id describing the data, the time interval of the data, a version -indicating when the segment was created, and a shard partition number. - -Data in Druid can be conceptually thought of as being either real-time -or historical. Real-time data refers to recently ingested data; -typically, in a production setting, this will be data for the current -hour. Historical data refers to any data that is older. Segments can -be similarly classified as being either real-time or historical. - -Historical segments are immutable and do not support insert, delete, -or update semantics. By maintaining immutable blocks of data within -the system, we can maintain a consistent snapshot of historical -data and provide read consistency without having to worry about -concurrent updates and deletes. If updates to a historical segment are -required, we build a new segment for the same data source and time -interval with the updated data. This new segment will have an updated version identifier. - -Multiple segments for the same data source and time range -may exist in the system at any time. To provide read consistency, -Druid read operations always access data in a particular time range -from the segment with the latest version identifier for that time -range. Historical segments may be stored on local disk or in a -key-value “deep” store such as S3 \cite{decandia2007dynamo} or HDFS \cite{shvachko2010hadoop}. All historical -segments have associated metadata describing properties of the segment -such as size in bytes, compression format, and location in deep -storage. - -Real-time segments are mutable and generally represent a much shorter -duration of time than historical segments. Real-time segments contain -recent data and are incrementally populated as new events are -ingested. On a periodic basis, real-time segments are converted into -historical segments. Additional details about this conversion process -are given in Section~\ref{sec:realtime}. - -\section{Cluster} -\label{sec:cluster} -A Druid cluster consists of different types of nodes, each performing -a specific function. The composition of a Druid cluster is shown in -Figure~\ref{fig:druid_cluster}. - -\begin{figure*} -\centering -\includegraphics[width = 6in]{druid_cluster} -\caption{An overview of a Druid cluster.} -\label{fig:druid_cluster} -\end{figure*} - -Recall that data in Druid is classified as either real-time or -historical. The Druid cluster is architected to reflect this -conceptual separation of data. Real-time nodes are responsible for -ingesting, storing, and responding to queries for the most recent -events. Similarly, historical compute nodes are responsible for -loading and responding to queries for historical events. - -Data in Druid is stored on storage nodes. Storage nodes can be either -compute or real-time nodes. Queries to access this data will -typically first hit a layer of broker nodes. Broker nodes are -responsible for finding and routing queries down to the storage nodes -that host the pertinent data. The storage nodes compute their portion -of the query response in parallel and return their results to the -brokers. Broker nodes, compute nodes, and realtime nodes can all be -classified as queryable nodes. - -Druid also has a set of coordination nodes to manage load assignment, -distribution, and replication. Coordination nodes are not queryable -and instead focus on maintaining cluster stability. Coordination nodes -have an external dependency on a MySQL database. - -Druid relies on an Apache Zookeeper \cite{hunt2010zookeeper} cluster -for coordination. This dependency is required because there is no -direct coordination-related communication between Druid nodes. The -following sections will discuss each Druid component in greater -detail. - -\subsection{Apache Zookeeper} -Zookeeper is a service for coordinating processes of distributed -applications. Zookeeper provides connecting applications an -abstraction of a hierarchy of data nodes known as znodes. Each znode -is part of a hierarchical namespace, similar to file -systems. Zookeeper has the concept of ephemeral and permanent -znodes. Permanent nodes must be created and destroyed explicitly by a -connecting application. Ephemeral znodes can be created by connecting -applications and deleted either explicitly or if the session that -created the znode is terminated (such as in the event of service -failure). - -\subsection{Historical Compute Nodes} -Historical compute nodes are the main workers of a Druid cluster and -are self-contained and self-sufficient. Compute nodes load historical -segments from permanent/deep storage and expose them for -querying. There is no single point of contention between the nodes and -nodes have no knowledge of one another. Compute nodes are -operationally simple; they only know how to perform the tasks they are -assigned. To help other services discover compute nodes and the data -they hold, every compute node maintains a constant Zookeeper -connection. Compute nodes announce their online state and the segments -they serve by creating ephemeral nodes under specifically configured -Zookeeper paths. Instructions for a given compute node to load new -segments or drop existing segments are sent by creating ephemeral -znodes under a special “load queue” path associated with the compute -node. - -To expose a segment for querying, a compute node must first possess a -local copy of the segment. Before a compute node downloads a segment -from deep storage, it first checks a local disk directory (cache) to -see if the segment already exists in local storage. If no cache -information about the segment is present, the compute node will -download metadata about the segment from Zookeeper. This metadata -includes information about where the segment is located in deep -storage and about how to decompress and process the segment. Once a -compute node completes processing a segment, the node announces (in -Zookeeper) that it is serving the segment. At this point, the segment -is queryable. - -\subsubsection{Tiers} -\label{sec:tiers} -Compute nodes can be grouped in different tiers, where all nodes in a -given tier are identically configured. Different performance and -fault-tolerance parameters can be set for each tier. The purpose of -tiered nodes is to enable higher or lower priority segments to be -distributed according to their importance. For example, it is possible -to spin up a “hot” tier of compute nodes that have a high number of -cores and a large RAM capacity. The “hot” cluster can be configured to -download more frequently accessed segments. A parallel “cold” cluster -can also be created with much less powerful backing hardware. The -“cold” cluster would only contain less frequently accessed segments. - -\subsection{Real-time Nodes} -\label{sec:realtime} -Real-time nodes encapsulate the functionality to ingest and query -real-time data streams. Data indexed via these nodes is immediately -available for querying. Real-time nodes are a consumer of data and -require a corresponding producer to provide the data -stream. Typically, for data durability purposes, a message bus such as -Kafka \cite{kreps2011kafka} sits between the producer and the real-time node as shown -in Figure~\ref{fig:data-ingestion}. - -\begin{figure} -\centering -\includegraphics[width = 3in]{druid_message_bus} -\caption{Real-time data ingestion.} -\label{fig:data-ingestion} -\end{figure} - -The purpose of the message bus in Figure~\ref{fig:data-ingestion} is to act as a buffer for -incoming events. The message bus can maintain offsets indicating the -position in an event stream that a real-time node has read up to and -real-time nodes can update these offsets periodically. The message bus also acts as a backup storage for recent events. -Real-time nodes ingest data by reading events from the message bus. The time from event creation to message bus storage to -event consumption is on the order of hundreds of milliseconds. - -Real-time nodes maintain an in-memory index for all incoming -events. These indexes are incrementally populated as new events appear on the message bus. The indexes are also directly queryable. -Real-time nodes persist their indexes to disk either periodically or after some maximum row limit is -reached. After each persist, a real-time node updates the message bus -with the offset of the last event of the most recently persisted -index. Each persisted index is immutable. If a real-time node fails and recovers, it can simply reload -any indexes that were persisted to disk and continue reading the -message bus from the point the last offset was committed. Periodically committing offsets reduces the number of messages a real-time -node has to rescan after a failure scenario. - -Real-time nodes maintain a consolidated view of the currently updating -index and of all indexes persisted to disk. This unified view allows -all indexes on a node to be queried. On a periodic basis, the nodes will -schedule a background task that searches for all persisted indexes of -a data source. The task merges these indexes together and builds a -historical segment. The nodes will upload the segment to deep storage -and provide a signal for the historical compute nodes to begin serving -the segment. The ingest, persist, merge, and handoff steps are fluid; -there is no data loss as a real-time node converts a real-time segment -to a historical one. Figure~\ref{fig:data-durability} illustrates the process. - -\begin{figure} -\centering -\includegraphics[width = 3in]{druid_realtime_flow} -\caption{Real-time data durability} -\label{fig:data-durability} -\end{figure} - -Similar to compute nodes, real-time nodes announce segments in -Zookeeper. Unlike historical segments, real-time segments may -represent a period of time that extends into the future. For example, -a real-time node may announce it is serving a segment that contains -data for the current hour. Before the end of the hour, the real-time -node continues to collect data for the hour. Every 10 minutes (the -persist period is configurable), the node will flush and persist its -in-memory index to disk. At the end of the current hour, the real-time -node prepares to serve data for the next hour by creating a new index -and announcing a new segment for the next hour. The node does not -immediately merge and build a historical segment for the previous hour -until after some window period has passed. Having a window period -allows for straggling data points to come in and minimizes the risk of -data loss. At the end of the window period, the real-time node will -merge all persisted indexes, build a historical segment for the -previous hour, and hand the segment off to historical nodes to -serve. Once the segment is queryable on the historical nodes, the -real-time node flushes all information about the segment and -unannounces it is serving the segment. - -Real-time nodes are highly scalable. If the data volume and ingestion -rates for a given data source exceed the maximum capabilities of a -single node, additional nodes can be added. Multiple nodes can -consume events from the same stream, and every individual node only -holds a portion of the total number of events. This creates natural -partitions across nodes. Each node announces the real-time segment it -is serving and each real-time segment has a partition number. Data -from individual nodes will be merged at the Broker level. To our -knowledge, the largest production level real-time Druid cluster is -consuming approximately 2 TB of raw data per hour. - -\subsection{Broker Nodes} -Broker nodes act as query routers to other queryable nodes such as -compute and real-time nodes. Broker nodes understand the metadata -published in Zookeeper about what segments exist and on what nodes the -segments are stored. Broker nodes route incoming queries such that the queries hit -the right storage nodes. Broker nodes also merge partial results from -storage nodes before returning a final consolidated result to the -caller. Additionally, brokers provide an extra level of data -durability as they maintain a cache of recent results. In the event -that multiple storage nodes fail and all copies of a segment are -somehow lost, it is still possible that segment results can still be -returned if that information exists in the cache. - -\subsubsection{Timeline} -To determine the correct nodes to forward queries to, Broker nodes -first build a view of the world from information in Zookeeper. Recall -that Druid uses Zookeeper to maintain information about all compute -and real-time nodes in a cluster and the segments those nodes are -serving. For every data source in Zookeeper, the Broker node builds a -timeline of segments for the data source and the nodes that serve them. A timeline -consists of segments and represents which segments contain data for -what ranges of time. Druid may have multiple segments where the data -source and interval are the same but versions differ. The timeline -view will always surface segments with the most recent version -identifier for a time range. If two segments intervals overlap, the segment with the more recent -version always has precedence. When queries are received for a specific -data source and interval, the Broker node performs a lookup on the -timeline associated with the query data source for the query interval -and retrieves the segments that contain data for the query. The broker -node maps these segments to the storage nodes that serve them and -forwards the query down to the respective nodes. - - -\subsubsection{Caching} -\label{sec:caching} -Broker nodes employ a distributed cache with a LRU \cite{o1993lru, -kim2001lrfu} cache invalidation strategy. The broker cache stores -per segment results. The cache can be local to each broker node or -shared across multiple nodes using an external distributed cache -such as memcached \cite{fitzpatrick2004distributed}. Recall that each time a broker node receives a -query, it first maps the query to a set of segments. A subset of -these segment results may already exist in the cache and the results -can be directly pulled from the cache. For any segment results that -do not exist in the cache, the broker node will forward the query -to the compute nodes. Once the compute nodes return their results, -the broker will store those results in the cache. Real-time segments -are never cached and hence requests for real-time data will always -be forwarded to real-time nodes. Real-time data is perpetually -changing and caching the results would be unreliable. - - -\subsection{Coordination (Master) Nodes} -The Druid coordination or master nodes are primarily in charge of -segment management and distribution. The Druid master is responsible -for loading new segments, dropping outdated segments, managing segment -replication, and balancing segment load. Druid uses a multi-version -concurrency control swapping protocol for managing segments in order -to maintain stable views. - -The Druid master runs periodically to determine the current state of -the cluster. It makes decisions by comparing the expected state of the -cluster with the actual state of the cluster at the time of the -run. As with all Druid nodes, the Druid master maintains a connection -to Zookeeper for current cluster information. The master also -maintains a connection to a MySQL database that contains additional -operational parameters and configurations. One of the key pieces of -information located in the MySQL database is a segment table that -contains a list of historical segments that should be served. This -table can be updated by any service that creates historical -segments. The MySQL database also contains a rule table that governs -how segments are created, destroyed, and replicated in the cluster. - -The master does not directly communicate with a compute node when -assigning it work; instead the master creates an ephemeral znode in -Zookeeper containing information about what the compute node should -do. The compute node maintains a similar connection to Zookeeper to -monitor for new work. - -\subsubsection{Rules} -Historical segments are loaded and dropped from the cluster based on a -set of rules. Rules indicate how segments should be assigned to -different compute node tiers and how many replicates of a segment -should exist in each tier. Rules may also indicate when segments -should be dropped entirely from the cluster. The master loads a set of -rules from a rule table in the MySQL database. Rules may be specific -to a certain data source and/or a default set of rules can be -configured. The master will cycle through all available segments and -match each segment with the first rule that applies to it. - -\subsubsection{Load Balancing} -In a typical production environment, queries often hit dozens or even -hundreds of data segments. Since each compute node has limited -resources, historical segments must be distributed among the cluster -to ensure that the cluster load is not too imbalanced. Determining -optimal load distribution requires some knowledge about query patterns -and speeds. Typically, queries cover recent data spanning contiguous -time intervals for a single data source. On average, queries that -access smaller segments are faster. - -These query patterns suggest replicating recent historical segments at -a higher rate, spreading out large segments that are close in time to -different compute nodes, and co-locating segments from different data -sources. To optimally distribute and balance segments among the -cluster, we developed a cost-based optimization procedure that takes -into account the segment data source, recency, and size. The exact -details of the algorithm are beyond the scope of this paper. - -\section{Query API} -\label{sec:query-api} -Queries to Druid are made in the form of POST requests. All queryable -Druid nodes share the same query API. The body of the POST request is -a JSON object containing key-value pairs specifying various query -parameters. A typical query will contain the data source name, the -granularity of the result data, the time range of interest, and the -type of request. A sample time series query is shown below: -\begin{verbatim} -{ - "queryType" : "timeseries", - "dataSource" : "sample_data", - "intervals" : "2013-01-01/2013-01-02", - "filter" : { - "type" : "selector", - "dimension" : "poets", - "value" : "Ke$ha" - }, - "granularity" : "day", - "aggregations" : [ - { - "type" : "count", - "fieldName" : "row", - "name" : "row" - } - ] -} -\end{verbatim} - -Certain query types will also support a filter set. A filter set is an -arbitrary Boolean expression of dimension name and value -pairs. Support for complex nested filter sets enables flexibility and -provides the ability to deeply explore data. - -The exact query syntax depends on the query type and the information requested. -It is beyond the scope of this paper to describe the Query API in full detail. -We are also in the process of building out SQL support for Druid. - -\section{Storage} -\label{sec:storage} -Druid is a column-oriented data store. When considering aggregates -over a large number of events, the advantages storing data as columns -rather than rows are well documented \cite{cattell2011scalable}. Column storage allows for -more efficient CPU usage as only what is needed is actually loaded and -scanned. In a row oriented data store, all columns associated with a -row must be scanned as part of an aggregation. The additional scan -time can introduce performance degradations as high as 250\% \cite{bear2012vertica}. - -\subsection{Column Types} -Druid has multiple column types to represent the various column value -formats. Depending on the column type, different compression methods -are used to reduce the cost of storing a column in memory and on -disk. In the example given in Table~\ref{tab:sample_data}, the -publisher, advertiser, gender, and country columns only contain -strings. String columns can be dictionary encoded. Dictionary encoding -is a common method to compress data and has been used in other data -stores such as Powerdrill \cite{hall2012processing}. In the example in -Table~\ref{tab:sample_data}, we can map each publisher to an unique -integer identifier. -\begin{verbatim} -bieberfever.com -> 0 -ultratrimfast.com -> 1 -\end{verbatim} -This mapping allows us to represent the publisher column as an integer -array where the array indices correspond to the rows of the original -data set. For the publisher column, we can represent the unique -publishers as follows: -\begin{verbatim} -[0, 0, 1, 1] -\end{verbatim} - -The resulting integer array lends itself very well to -compression. Generic compression algorithms on top of encodings are -very common in column-stores. We opted to use the LZF \cite{liblzf2013} compression -algorithm. - -We can leverage similar compression algorithms for numeric -columns. For example, the clicks and revenue columns in -Table~\ref{tab:sample_data} can also be expressed as individual -arrays. -\begin{verbatim} -Clicks -> [25, 42, 17, 170] -Revenue -> [15.70, 29.18, 17.31, 34.01] -\end{verbatim} -In this case we compress the raw values as opposed to their dictionary -representations. - -\subsection{Filters} -To support arbitrary filter sets, Druid creates additional lookup -indices for string columns. These lookup indices are compressed and -Druid operates over the indices in their compressed form. Filters can -be expressed as Boolean equations of multiple lookup indices. Boolean -operations of indices in their compressed form is both performance and -space efficient. - -Let us consider the publisher column in -Table~\ref{tab:sample_data}. For each unique publisher in -Table~\ref{tab:sample_data}, we can form some representation -indicating which table rows a particular publisher is seen. We can -store this information in a binary array where the array indices -represent our rows. If a particular publisher is seen in a certain -row, that array index is marked as \texttt{1}. For example: -\begin{verbatim} -bieberfever.com -> rows [0, 1] -> [1][1][0][0] -ultratrimfast.com -> rows [2, 3] -> [0][0][1][1] -\end{verbatim} - -\texttt{\href{http://bieberfever.com}{bieberfever.com}} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values -to row indices forms an inverted index \cite{tomasic1993performance}. If we want to know which -rows contain {\ttfamily bieberfever.com} or {\ttfamily ultratrimfast.com}, we can \texttt{OR} together -the \texttt{bieberfever.com} and \texttt{ultratrimfast.com} arrays. -\begin{verbatim} -[0][1][0][1] OR [1][0][1][0] = [1][1][1][1] -\end{verbatim} - -\begin{figure} -\centering -\includegraphics[width = 3in]{concise_plot} -\caption{Integer array size versus CONCISE set size.} -\label{fig:concise_plot} -\end{figure} - -This approach of performing Boolean operations on large bitmap sets is -commonly used in search engines. Bitmap compression algorithms are a -well-defined area of research and often utilize run-length -encoding. Well known algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, -Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned Word-Aligned -Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use the CONCISE -algorithm \cite{colantonio2010concise} as it can outperform WAH by reducing the size of the -compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot} illustrates the number of -bytes using CONCISE compression versus using an integer array. The -results were generated on a cc2.8xlarge system with a single thread, -2G heap, 512m young gen, and a forced GC between each run. The data -set is a single day’s worth of data collected from the Twitter garden -hose \cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12 -dimensions of varying cardinality. As an additional comparison, we -also resorted the data set rows to maximize compression. - -In the unsorted case, the total CONCISE compressed size was 53,451,144 -bytes and the total integer array size was 127,248,520 bytes. Overall, -CONCISE compressed sets are about 42\% smaller than integer arrays. -In the sorted case, the total CONCISE compressed size was 43,832,884 -bytes and the total integer array size was 127,248,520 bytes. What is -interesting to note is that after sorting, global compression only -increased minimally. The total CONCISE set size to total integer array -size is 34\%. It is also interesting to note that as the cardinality -of a dimension approaches the total number of rows in a data set, -integer arrays actually require less space than CONCISE sets. - -\subsection{Storage Engine} -Druid’s persistence components allows for different storage engines to -be plugged in, similar to Dynamo \cite{decandia2007dynamo}. These storage engines may store -data in in-memory structures such as the JVM heap or in memory mapped -structures. The ability to swap storage engines allows for Druid to be -configured depending on a particular application’s specifications. An -in-memory storage engine may be operationally more expensive than a -memory-mapped storage engine but could be a better alternative if -performance is critical. At Metamarkets, we commonly use a -memory-mapped storage engine. - -\section{Robustness and Fault-Tolerance} -\label{sec:robustness} -To achieve high system availability and data durability, Druid employs -several fault recovery techniques. Druid has no single point of -failure. - -\subsection{Replication} -Druid replicates historical segments on multiple hosts. The number of -replicates in each tier of the historical compute cluster is fully -configurable. Setups that require high levels of fault tolerance can -be configured to have a high number of replicates. Replicates are -assigned to compute nodes by coordination nodes using the same load -distribution algorithm discussed in Section~\ref{sec:caching}. Conceptually, -broker nodes do not distinguish historical segments from their -replicates. Broker nodes forward queries to the first node it finds -that contains data for the query. - -Real-time segments follow a different replication model as real-time -segments are mutable. Recall that real-time nodes act as consumers of -a data stream. Multiple real-time nodes can read from the same message -bus if each maintains a unique offset, hence creating multiple copies -of a real-time segment. If a real-time node fails and recovers, it can -reload any indexes that were persisted to disk and read from the -message bus from the point it last committed an offset. - -\subsection{Local Segment Cache} -Recall that each Druid compute node maintains a local cache of -historical segments it recently served. A compute node also has a -lookup table for segments it has in its cache and stores this lookup -table on disk. When a compute node is assigned a new segment to load, -the compute node will first check its local segment cache directory to -see if the segment had been previously downloaded. If a cache entry -exists, the compute node will directly read the segment binary files -and load the segment. - -The segment cache is also leveraged when a compute node is initially -started. During startup, a compute node will first read its lookup -table to determine what segments it has cached. All cached segments -are immediately loaded and served. This feature introduces minimal -overhead and allows a compute node to readily serve data as soon as it -comes online. By making data quickly available on startup and -minimizing initial startup time, compute nodes that become -inexplicably disconnected from the cluster can reconnect themselves -seamlessly. This also means that software updates can be pushed to -compute nodes without disruption to cluster operations. In practice, a -software update to a compute node can be completed before coordination -nodes even notice that the compute node has disconnected. At -Metamarkets, we update Druid through rolling restarts. Compute nodes -are updated one at a time and we experience no downtime or data loss -through the update process. - -\subsection{Failure Detection} -If a compute node completely fails and becomes unavailable, the -ephemeral Zookeeper znodes it created are deleted. The master node -will notice that certain segments are insufficiently replicated or -missing altogether. Additional replicates will be created and -redistributed throughout the cluster. - -We are moving towards building out infrastructure to support -programmatic creation of real-time nodes. In the near future, the -master node will also notice if real-time segments are insufficiently -replicated and automatically create additional real-time nodes as -redundant backups. - -\subsection{Adding and Removing Nodes} -Starting and removing Druid nodes is a relatively simple process; all -that is required is to start and stop Java processes. There is minimal -operational overhead with adding nodes in batches. Scaling down the -cluster is usually done one node at a time with some time lapse -between shutdowns. This allows the master to have ample time to -redistribute load and create additional replicates. Shutting down -nodes in batches is generally not supported as it may destroy all -copies of a segment, which would lead to data loss. - -\section{Performance Benchmarks} -\label{sec:benchmarks} -To benchmark Druid performance, we created a large test cluster with -6TB of uncompressed data, representing tens of billions of fact -rows. The data set contained more than a dozen dimensions, with -cardinalities ranging from the double digits to tens of millions. We computed -four metrics for each row (counts, sums, and averages). The data was -sharded first on timestamp then on dimension values, creating -thousands of shards roughly 8 million fact rows apiece. - -The cluster used in the benchmark consisted of 100 historical compute -nodes, each with 16 cores, 60GB of RAM, 10 GigE Ethernet, and 1TB of -disk space. Collectively, the cluster comprised of 1600 cores, 6TB or -RAM, fast Ethernet and more than enough disk space. - -SQL statements are included in Table~\ref{tab:sql_queries} to describe the -purpose of each of the queries. Please note: -\begin{itemize} -\item The timestamp range of the queries encompassed all data. -\item Each machine was a 16-core machine with 60GB RAM and 1TB of local - disk. The machine was configured to only use 15 threads for - processing queries. -\item A memory-mapped storage engine was used (the machine was configured to memory map the data - instead of loading it into the Java heap.) -\end{itemize} - -\begin{figure} -\centering -\includegraphics[width = 3in]{cluster_scan_rate} -\caption{Druid cluster scan rate with lines indicating linear scaling - from 25 nodes.} -\label{fig:cluster_scan_rate} -\end{figure} - -\begin{figure} -\centering -\includegraphics[width = 3in]{core_scan_rate} -\caption{Druid core scan rate.} -\label{fig:core_scan_rate} -\end{figure} - -\begin{table*} - \centering - \caption{Druid Queries} - \label{tab:sql_queries} -\begin{verbatim} -1 SELECT count(*) FROM _table_ WHERE timestamp >= ? AND timestamp < ? - -2 SELECT count(*), sum(metric1) FROM _table_ WHERE timestamp >= ? AND timestamp < ? - -3 SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM _table_ - WHERE timestamp >= ? AND timestamp < ? - -4 SELECT high_card_dimension, count(*) AS cnt FROM _table_ - WHERE timestamp >= ? AND timestamp < ? - GROUP BY high_card_dimension ORDER BY cnt limit 100 - -5 SELECT high_card_dimension, count(*) AS cnt, sum(metric1) FROM _table_ - WHERE timestamp >= ? AND timestamp < ? - GROUP BY high_card_dimension ORDER BY cnt limit 100 - -6 SELECT high_card_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) - FROM _table_ WHERE timestamp >= ? AND timestamp < ? - GROUP BY high_card_dimension ORDER BY cnt limit 100 -\end{verbatim} -\end{table*} - -%\begin{enumerate} -%\item \texttt{Select high\_card\_dimension, count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) AS cnt from \_table\_ where timestamp $\geq$ ? and timestamp $<$ ? group by high\_card\_dimension order by cnt limit 100;} -%\end{enumerate} - -Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and -Figure~\ref{fig:core_scan_rate} shows the core scan rate. In -Figure~\ref{fig:cluster_scan_rate} we also include projected linear -scaling based on the results of the 25 core cluster. In particular, -we observe diminishing marginal returns to performance in the size of -the cluster. Under linear scaling, SQL query 1 would have achieved a -speed of 37 billion rows per second on our 75 node cluster. In fact, -the speed was 26 billion rows per second. The speed up of a parallel -computing system is often limited by the time needed for the -sequential operations of the system, in accordance with Amdahl's law -\cite{amdahl1967validity}. - -The first query listed in Table~\ref{tab:sql_queries} is a simple -count, achieving scan rates of 33M rows/second/core. We believe -the 75 node cluster was actually overprovisioned for the test -dataset, explaining the modest improvement over the 50 node cluster. -Druid's concurrency model is based on shards: one thread will scan one -shard. If the number of segments on a compute node modulo the number -of cores is small (e.g. 17 segments and 15 cores), then many of the -cores will be idle during the last round of the computation. - -When we include more aggregations we see performance degrade. This is -because of the column-oriented storage format Druid employs. For the -\texttt{count(*)} queries, Druid only has to check the timestamp column to satisfy -the ``where'' clause. As we add metrics, it has to also load those metric -values and scan over them, increasing the amount of memory scanned. - -\section{Related Work} -\label{sec:related} -Cattell \cite{cattell2011scalable} maintains a great summary about existing Scalable SQL and -NoSQL data stores. In the landscape of distributed data stores, Druid -feature-wise sits somewhere between Google’s Dremel \cite{melnik2010dremel} and Powerdrill -\cite{hall2012processing}. Druid has most of the features implemented in Dremel (Dremel -handles arbitrary nested data structures while Druid only allows for a -single level of array-based nesting) and many of the interesting -compression algorithms mentioned in Powerdrill. - -Although Druid builds on many of the same principles as other -distributed column-oriented data stores \cite{fink2012distributed}, most existing data -stores are designed to be key-value stores \cite{lerner2010redis}, or -document/extensible record stores \cite{stonebraker2005c}. Such data stores are great -solutions for traditional data warehouse needs and general -back-office/reporting usage. Typically, analysts will query these data -stores and build reports from the results. In-memory databases such as -SAP’s HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb} are examples of other data stores that -are highly suited for traditional data warehousing needs. Druid is a -front-office system designed such that user-facing dashboards can be -built on top of it. Similar to \cite{paraccel2013}, Druid has analytical features -built in. The main features Druid offers over traditional data -warehousing solutions are real-time data ingestion, interactive -queries and interactive query latencies. In terms of real-time -ingestion and processing of data, Trident/Storm \cite{marz2013storm} and Streaming -Spark \cite{zaharia2012discretized} are other popular real-time computation systems, although -they lack the data storage capabilities of Druid. Spark/Shark \cite{engle2012shark} are -also doing similar work in the area of fast data analysis on large -scale data sets. Cloudera Impala \cite{cloudera2013} is another system focused on -optimizing querying performance, but more so in Hadoop environments. - -Druid leverages a unique combination of algorithms in its -architecture. Although we believe no other data store has the same set -of functionality as Druid, some of Druid’s features such as using -inverted indices to perform faster filters also exist in other data -stores \cite{macnicol2004sybase}. - -\section{Conclusions} -\label{sec:conclusions} -In this paper, we presented Druid, a distributed, column-oriented, -real-time analytical data store. Druid is a highly customizable -solution that is optimized for fast query latencies. Druid ingests -data in real-time and is fault-tolerant. We discussed the performance -of Druid on billion row data sets. We summarized key Druid architecture -aspects such as the storage format, query language and general -execution. In the future, we plan to cover more in depth the different -algorithms we’ve developed for Druid and how other systems may plug -into Druid to achieve powerful use cases. - -\section{Acknowledgements} -\label{sec:acknowledgements} -We want to thank Steve Harris and Jaypal Sethi for their feedback on improving this paper. -We also want to give recognition to Adam Smith; without him the first Metamarkets hackathon would not have -been organized and this paper would not have been created. Another special recognition to Katherine Chu for -helping to create all the images in this paper. - -Druid could not have been built without the help of many great -engineers at Metamarkets and in the community. We want to thank Danny Yuan, Jae Hyeon Bae, Paul Baclace, Dave -Nielsen, and Dhruv Parthasarathy for their -contributions to Druid. - -% The following two commands are all you need in the -% initial runs of your .tex file to -% produce the bibliography for the citations in your paper. -\bibliographystyle{abbrv} -\bibliography{druid} % druid.bib is the name of the Bibliography in this case -% You must have a proper ".bib" file -% and remember to run: -% latex bibtex latex latex -% to resolve all references - -%Generated by bibtex from your ~.bib file. Run latex, -%then bibtex, then latex twice (to resolve references). - -%APPENDIX is optional. -% ****************** APPENDIX ************************************** -% Example of an appendix; typically would start on a new page -%pagebreak - -\end{document}