\documentclass{acm_proc_article-sp} \usepackage{graphicx} \usepackage{balance} \usepackage{fontspec} \setmainfont[Ligatures={TeX}]{Times} \usepackage{hyperref} \graphicspath{{figures/}} \hyphenation{metamarkets nelson} \begin{document} % ****************** TITLE **************************************** \title{Druid: A Real-time Analytical Data Store} % ****************** AUTHORS ************************************** \numberofauthors{6} \author{ \alignauthor Fangjin Yang, Eric Tschetter, Gian Merlino, Nelson Ray, Xavier Léauté, Deep Ganguli, Himadri Singh\\ \email{\{fangjin, cheddar, gian, nelson, xavier, deep, himadri\}@metamarkets.com} } \date{21 March 2013} \maketitle \begin{abstract} Druid is an open source\footnote{\href{https://github.com/metamx/druid}{https://github.com/metamx/druid}} data store designed for real-time exploratory analytics on large data sets. The system combines a column-oriented storage layout, a distributed, shared-nothing architecture, and an advanced indexing structure to allow for the arbitrary exploration of billion-row tables with sub-second latencies. In this paper, we describe Druid's architecture, and detail how it supports fast aggregations, flexible filters, and low latency data ingestion. \end{abstract} \section{Introduction} In recent years, the proliferation of internet technology has created a surge in machine-generated events. Individually, these events contain minimal useful information and are of low value. Given the time and resources required to extract meaning from large collections of events, many companies were willing to discard this data instead. Although infrastructure has been built handle event based data (e.g. IBM's Netezza\cite{singh2011introduction}, HP's Vertica\cite{bear2012vertica}, and EMC's Greenplum\cite{miner2012unified}), they are largely sold at high price points and are only targeted towards those companies who can afford the offerings. A few years ago, Google introduced MapReduce \cite{dean2008mapreduce} as their mechanism of leveraging commodity hardware to index the internet and analyze logs. The Hadoop \cite{shvachko2010hadoop} project soon followed and was largely patterned after the insights that came out of the original MapReduce paper. Hadoop is currently deployed in many organizations to store and analyze large amounts of log data. Hadoop has contributed much to helping companies convert their low-value event streams into high-value aggregates for a variety of applications such as business intelligence and A-B testing. As with a lot of great systems, Hadoop has opened our eyes to a new space of problems. Specifically, Hadoop excels at storing and providing access to large amounts of data, however, it does not make any performance guarantees around how quickly that data can be accessed. Furthermore, although Hadoop is a highly available system, performance degrades under heavy concurrent load. Lastly, while Hadoop works well for storing data, it is not optimized for ingesting data and making that data immediately readable. Early on in the development of the Metamarkets product, we ran into each of these issues and came to the realization that Hadoop is a great back-office, batch processing, and data warehousing system. However, as a company that has product-level guarantees around query performance and data availability in a highly concurrent environment (1000+ users), Hadoop wasn't going to meet our needs. We explored different solutions in the space, and after trying both Relational Database Management Systems and NoSQL architectures, we came to the conclusion that there was nothing in the open source world that could be fully leveraged for our requirements. We ended up creating Druid, an open-source, distributed, column-oriented, realtime analytical data store. In many ways, Druid shares similarities with other interactive query systems \cite{melnik2010dremel}, main-memory databases \cite{farber2012sap}, and widely-known distributed data stores such as BigTable \cite{chang2008bigtable}, Dynamo \cite{decandia2007dynamo}, and Cassandra \cite{lakshman2010cassandra}. The distribution and query model also borrow ideas from current generation search infrastructure \cite{linkedin2013senseidb, apache2013solr, banon2013elasticsearch}. This paper describes the architecture of Druid, explores the various design decisions made in creating an always-on production system that powers a hosted service, and attempts to help inform anyone who faces a similar problem about a potential method of solving it. Druid is deployed in production at several technology companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}. The structure of the paper is as follows: we first describe the problem in Section \ref{sec:problem-definition}. Next, we detail system architecture from the point of view of how data flows through the system in Section \ref{sec:architecture}. We then discuss how and why data gets converted into a binary format in Section \ref{sec:storage-format}. We briefly describe the query API in Section \ref{sec:query-api}. Lastly, we leave off with some benchmarks in Section \ref{sec:benchmarks}, related work in Section \ref{sec:related} and conclusions are Section \ref{sec:conclusions}. \section{Problem Definition} \label{sec:problem-definition} Druid was originally designed to solve problems around ingesting and exploring large quantities of transactional events (log data). This form of timeseries data is commonly found in OLAP workflows and the nature of the data tends to be very append heavy. For example, consider the data shown in Table~\ref{tab:sample_data}. Table~\ref{tab:sample_data} contains data for edits that have occurred on Wikipedia. Each time a user edits a page in Wikipedia, an event is generated that contains metadata about the edit. This metadata is comprised of 3 distinct components. First, there is a timestamp column indicating when the edit was made. Next, there are a set dimension columns indicating various attributes about the edit such as the page that was edited, the user who made the edit, and the location of the user. Finally, there are a set of metric columns that contain values (usually numeric) to aggregate over, such as the number of characters added or removed in an edit. \begin{table*} \centering \caption{Sample Druid data for edits that have occurred on Wikipedia.} \label{tab:sample_data} \begin{tabular}{| l | l | l | l | l | l | l | l |} \hline \textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline 2011-01-01T01:00:00Z & Justin Bieber & Boxer & Male & San Francisco & 1800 & 25 \\ \hline 2011-01-01T01:00:00Z & Justin Bieber & Reach & Male & Waterloo & 2912 & 42 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Helz & Male & Calgary & 1953 & 17 \\ \hline 2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline \end{tabular} \end{table*} Our goal is to rapidly compute drill-downs and aggregates over this data. We want to answer questions like “How many edits were made on the page Justin Bieber from males in San Francisco?” and “What is the average number of characters that were added by people from Calgary over the span of a month?”. We also want queries over any arbitrary combination of dimensions to return with sub-second latencies. The need for Druid was facilitated by the fact that existing open source Relational Database Management Systems and NoSQL key/value stores were unable to provide a low latency data ingestion and query platform for interactive applications \cite{tschetter2011druid}. In the early days of Metamarkets, we were focused on building a hosted dashboard that would allow users to arbitrary explore and visualize event streams. The data store powering the dashboard needed to return queries fast enough that the data visualizations built on top of it could provide users with an interactive experience. In addition to the query latency needs, the system had to be multi-tenant and highly available. The Metamarkets product is used in a highly concurrent environment. Downtime is costly and many businesses cannot afford to wait if a system is unavailable in the face of software upgrades or network failure. Downtime for startups, who often lack proper internal operations management, can determine business success or failure. Finally, another key problem that Metamarkets faced in its early days was to allow users and alerting systems to be able to make business decisions in "real-time". The time from when an event is created to when that event is queryable determines how fast users and systems are able to react to potentially catastrophic occurrences in their systems. Popular open source data warehousing systems such as Hadoop were unable to provide the sub-second data ingestion latencies we required. The problems of data exploration, ingestion, and availability span multiple industries. Since Druid was open sourced in October 2012, it been deployed as a video, network monitoring, operations monitoring, and online advertising analytics platform in multiple companies. \section{Architecture} \label{sec:architecture} A Druid cluster consists of different types of nodes and each node type is designed to perform a specific set of things. We believe this design separates concerns and simplifies the complexity of the system. The different node types operate fairly independent of each other and there is minimal interaction between them. Hence, intra-cluster communication failures have minimal impact on data availability. To solve complex data analysis problems, the different node types come together to form a fully working system. The name Druid comes from the Druid class in many role-playing games: it is a shape-shifter, capable of taking on many different forms to fulfill various different roles in a group. The composition of and flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. \begin{figure*} \centering \includegraphics[width = 4.5in]{cluster} \caption{An overview of a Druid cluster and the flow of data through the cluster.} \label{fig:cluster} \end{figure*} \subsection{Real-time Nodes} \label{sec:realtime} Real-time nodes encapsulate the functionality to ingest and query event streams. Events indexed via these nodes are immediately available for querying. The nodes are only concerned with events for some small time range and periodically hand off immutable batches of events they've collected over this small time range to other nodes in the Druid cluster that are specialized in dealing with batches of immutable events. Real-time nodes leverage Zookeeper \cite{hunt2010zookeeper} for coordination with the rest of the Druid cluster. The nodes announce their online state and the data they are serving in Zookeeper. Real-time nodes maintain an in-memory index buffer for all incoming events. These indexes are incrementally populated as new events are ingested and the indexes are also directly queryable. Druid virtually behaves as a row store for queries on events that exist in this JVM heap-based buffer. To avoid heap overflow problems, real-time nodes persist their in-memory indexes to disk either periodically or after some maximum row limit is reached. This persist process converts data stored in the in-memory buffer to a column oriented storage format described in Section \ref{sec:storage-format}. Each persisted index is immutable and real-time nodes load persisted indexes into off-heap memory such that they can still be queried. Figure~\ref{fig:realtime_flow} illustrates the process. \begin{figure} \centering \includegraphics[width = 2.8in]{realtime_flow} \caption{Real-time nodes first buffer events in memory. On a periodic basis, the in-memory index is persisted to disk. On another periodic basis, all persisted indexes are merged together and handed off. Queries for data will hit the in-memory index and the persisted indexes.} \label{fig:realtime_flow} \end{figure} On a periodic basis, each real-time node will schedule a background task that searches for all locally persisted indexes. The task merges these indexes together and builds an immutable block of data that contains all the events that have ingested by a real-time node for some span of time. We refer to this block of data as a "segment". During the handoff stage, a real-time node uploads this segment to a permanent backup storage, typically a distributed file system such as S3 \cite{decandia2007dynamo} or HDFS \cite{shvachko2010hadoop}, which Druid refers to as "deep storage". The ingest, persist, merge, and handoff steps are fluid; there is no data loss during any of the processes. To better understand the flow of data through a real-time node, consider the following example. First, we start a real-time node at 13:37. The node will only accept events for the current hour or the next hour. When the node begins ingesting events, it will announce that it is serving a segment of data for a time window from 13:00 to 14:00. Every 10 minutes (the persist period is configurable), the node will flush and persist its in-memory buffer to disk. Near the end of the hour, the node will likely see events with timestamps from 14:00 to 15:00. When this occurs, the node prepares to serve data for the next hour and creates a new in-memory index. The node then announces that it is also serving a segment for data from 14:00 to 15:00. The node does not immediately merge the indexes it persisted from 13:00 to 14:00, instead it waits for a configurable window period for straggling events from 13:00 to 14:00 to come in. Having a window period minimizes the risk of data loss from delays in event delivery. At the end of the window period, the real-time node merges all persisted indexes from 13:00 to 14:00 into a single immutable segment and hands the segment off. Once this segment is loaded and queryable somewhere else in the Druid cluster, the real-time node flushes all information about the data it collected for 13:00 to 14:00 and unannounces it is serving this data. This process is shown in Figure~\ref{fig:realtime_timeline}. \begin{figure*} \centering \includegraphics[width = 4.5in]{realtime_timeline} \caption{The node starts, ingests data, persists, and periodically hands data off. This process repeats indefinitely. The time intervals between different real-time node operations are configurable.} \label{fig:realtime_timeline} \end{figure*} \subsubsection{Availability and Scalability} Real-time nodes are a consumer of data and require a corresponding producer to provide the data stream. Commonly, for data durability purposes, a message bus such as Kafka \cite{kreps2011kafka} sits between the producer and the real-time node as shown in Figure~\ref{fig:realtime_pipeline}. Real-time nodes ingest data by reading events from the message bus. The time from event creation to event consumption is ordinarily on the order of hundreds of milliseconds. \begin{figure} \centering \includegraphics[width = 2.8in]{realtime_pipeline} \caption{Multiple real-time nodes can read from the same message bus. Each node maintains its own offset.} \label{fig:realtime_pipeline} \end{figure} The purpose of the message bus in Figure~\ref{fig:realtime_pipeline} is two-fold. First, the message bus acts as a buffer for incoming events. A message bus such as Kafka maintains offsets indicating the position in an event stream that a consumer (a real-time node) has read up to and consumers can programmatically update these offsets. Typically, real-time nodes update this offset each time they persist their in-memory buffers to disk. In a fail and recover scenario, if a node has not lost disk, it can reload all persisted indexes from disk and continue reading events from the last offset it committed. Ingesting events from a recently committed offset greatly reduces a node's recovery time. In practice, we see real-time nodes recover from such failure scenarios in an order of seconds. The second purpose of the message bus is to act as a single endpoint from which multiple real-time nodes can read events. Multiple real-time nodes can ingest the same set of events from the bus, thus creating a replication of events. In a scenario where a node completely fails and does not recover, replicated streams ensure that no data is lost. A single ingestion endpoint also allows for data streams for be partitioned such that multiple real-time nodes each ingest a portion of a stream. This allows additional real-time nodes to be seamlessly added. In practice, this model has allowed one of the largest production Druid clusters to be able to consume raw data at approximately 500 MB/s (150,000 events/s or 2 TB/hour). \subsection{Historical Nodes} Historical nodes encapsulate the functionality to load and serve the immutable blocks of data (segments) created by real-time nodes. In many real-world workflows, most of the data loaded in a Druid cluster is immutable and hence, historical nodes are typically the main workers of a Druid cluster. Historical nodes follow a shared-nothing architecture and there is no single point of contention among the nodes. The nodes have no knowledge of one another and are operationally simple; they only know how to load, drop, and serve immutable segments. Similar to real-time nodes, historical nodes announce their online state and the data they are serving in Zookeeper. Instructions to load and drop segments are sent over Zookeeper and contain information about where the segment is located in deep storage and how to decompress and process the segment. Before a historical node downloads a particular segment from deep storage, it first checks a local cache that maintains information about what segments already exist on the node. If information about a segment is not present in the cache, the historical node will proceed to download the segment from deep storage. This process is shown in Figure~\ref{fig:historical_download}. Once processing is complete, the segment is announced in Zookeeper. At this point, the segment is queryable. The local cache also allows for historical nodes to be quickly updated and restarted. On startup, the node examines its cache and immediately serves whatever data it finds. \begin{figure} \centering \includegraphics[width = 2.6in]{historical_download} \caption{Historical nodes download immutable segments from deep storage. Segments must be loaded in memory before they can be queried.} \label{fig:historical_download} \end{figure} Historical nodes can support read consistency because they only deal with immutable data. Immutable data blocks also enable a simple parallelization model: historical nodes can concurrently scan and aggregate immutable blocks without blocking. \subsubsection{Tiers} \label{sec:tiers} Historical nodes can be grouped in different tiers, where all nodes in a given tier are identically configured. Different performance and fault-tolerance parameters can be set for each tier. The purpose of tiered nodes is to enable higher or lower priority segments to be distributed according to their importance. For example, it is possible to spin up a “hot” tier of historical nodes that have a high number of cores and large memory capacity. The “hot” cluster can be configured to download more frequently accessed data. A parallel “cold” cluster can also be created with much less powerful backing hardware. The “cold” cluster would only contain less frequently accessed segments. \subsubsection{Availability} Historical nodes depend on Zookeeper for segment load and unload instructions. If Zookeeper becomes unavailable, historical nodes are no longer able to serve new data and drop outdated data, however, because the queries are served over HTTP, historical nodes are still be able to respond to query requests for the data they are currently serving. This means that Zookeeper outages do not impact current data availability on historical nodes. \subsection{Broker Nodes} Broker nodes act as query routers to historical and real-time nodes. Broker nodes understand the metadata published in Zookeeper about what segments are queryable and where those segments are located. Broker nodes route incoming queries such that the queries hit the right historical or real-time nodes. Broker nodes also merge partial results from historical and real-time nodes before returning a final consolidated result to the caller. \subsubsection{Caching} \label{sec:caching} Broker nodes contain a cache with a LRU \cite{o1993lru, kim2001lrfu} invalidation strategy. The cache can use local heap memory or an external distributed key/value store such as memcached \cite{fitzpatrick2004distributed}. Each time a broker node receives a query, it first maps the query to a set of segments. Results for certain segments may already exist in the cache and there is no need to recompute them. For any results that do not exist in the cache, the broker node will forward the query to the correct historical and real-time nodes. Once historical nodes return their results, the broker will cache these results on a per segment basis for future use. This process is illustrated in Figure~\ref{fig:caching}. Real-time data is never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable. \begin{figure*} \centering \includegraphics[width = 4.5in]{caching} \caption{Broker nodes cache per segment results. Every Druid query is mapped to a set of segments. Queries often combine cached segment results with those that need to be computed on historical and real-time nodes.} \label{fig:caching} \end{figure*} The cache also acts as an additional level of data durability. In the event that all historical nodes fail, it is still possible to query results if those results already exist in the cache. \subsubsection{Availability} In the event of a total Zookeeper outage, data is still queryable. If broker nodes are unable to communicate to Zookeeper, they use their last known view of the cluster and continue to forward queries to real-time and historical nodes. Broker nodes make the assumption that the structure of the cluster is the same as it was before the outage. In practice, this availability model has allowed our Druid cluster to continue serving queries for a significant period of time while we diagnosed Zookeeper outages. \subsection{Coordinator Nodes} Druid coordinator nodes are primarily in charge of data management and distribution on historical nodes. The coordinator nodes tell historical nodes to load new data, drop outdated data, replicate data, and move data to load balance. Druid uses a multi-version concurrency control swapping protocol for managing immutable segments in order to maintain stable views. If any immutable segment contains data that is wholly obseleted by newer segments, the outdated segment is dropped from the cluster. Coordinator nodes undergo a leader-election process that determines a single node that runs the coordinator functionality. The remaining coordinator nodes act as redundant backups. A coordinator node runs periodically to determine the current state of the cluster. It makes decisions by comparing the expected state of the cluster with the actual state of the cluster at the time of the run. As with all Druid nodes, coordinator nodes maintain a Zookeeper connection for current cluster information. Coordinator nodes also maintain a connection to a MySQL database that contains additional operational parameters and configurations. One of the key pieces of information located in the MySQL database is a table that contains a list of all segments that should be served by historical nodes. This table can be updated by any service that creates segments, for example, real-time nodes. The MySQL database also contains a rule table that governs how segments are created, destroyed, and replicated in the cluster. \subsubsection{Rules} Rules govern how historical segments are loaded and dropped from the cluster. Rules indicate how segments should be assigned to different historical node tiers and how many replicates of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. Rules are usually set for a period of time. For example, a user may use rules to load the most recent one month's worth of segments into a "hot" cluster, the most recent one year's worth of segments into a "cold" cluster, and drop any segments that are older. The coordinator nodes load a set of rules from a rule table in the MySQL database. Rules may be specific to a certain data source and/or a default set of rules may be configured. The coordinator node will cycle through all available segments and match each segment with the first rule that applies to it. \subsubsection{Load Balancing} In a typical production environment, queries often hit dozens or even hundreds of segments. Since each historical node has limited resources, segments must be distributed among the cluster to ensure that the cluster load is not too imbalanced. Determining optimal load distribution requires some knowledge about query patterns and speeds. Typically, queries cover recent segments spanning contiguous time intervals for a single data source. On average, queries that access smaller segments are faster. These query patterns suggest replicating recent historical segments at a higher rate, spreading out large segments that are close in time to different historical nodes, and co-locating segments from different data sources. To optimally distribute and balance segments among the cluster, we developed a cost-based optimization procedure that takes into account the segment data source, recency, and size. The exact details of the algorithm are beyond the scope of this paper and may be discussed in future literature. \subsubsection{Replication} Coordinator nodes may tell different historical nodes to load copies of the same segment. The number of replicates in each tier of the historical compute cluster is fully configurable. Setups that require high levels of fault tolerance can be configured to have a high number of replicas. Replicated segments are treated the same as the originals and follow the same load distribution algorithm. By replicating segments, single historical node failures are transparent in the Druid cluster. We use this property for software upgrades. We can seamlessly take a historical node offline, update it, bring it back up, and repeat the process for every historical node in a cluster. Over the last two years, we have never taken downtime in our Druid cluster for software upgrades. \subsubsection{Availability} Druid coordinator nodes have two external dependencies: Zookeeper and MySQL. Coordinator nodes rely on Zookeeper to determine what historical nodes already exist in the cluster. If Zookeeper becomes unavailable, the coordinator will no longer be able to send instructions to assign, balance, and drop segments. However, these operations do not affect data availability at all. The design principle for responding to MySQL and Zookeeper failures is the same: if an external dependency responsible for coordination fails, the cluster maintains the status quo. Druid uses MySQL to store operational management information and segment metadata information about what segments should exist in the cluster. If MySQL goes down, this information becomes unavailable to coordinator nodes. However, this does not mean data itself is unavailable. If coordinator nodes cannot communicate to MySQL, they will cease to assign new segments and drop outdated ones. Broker, historical and real-time nodes are still queryable during MySQL outages. \section{Storage Format} \label{sec:storage-format} Data tables in Druid (called \emph{data sources}) are collections of timestamped events and partitioned into a set of segments, where each segment is typically 5--10 million rows. Formally, we define a segment as a collection of rows of data that span some period in time. Segments represent the fundamental storage unit in Druid and replication and distribution are done at a segment level. Druid always requires a timestamp column as a method of simplifying data distribution policies, data retention policies, and first-level query pruning. Druid partitions its data sources into well-defined time intervals, typically an hour or a day, and may further partition on values from other columns to achieve the desired segment size. For example, partitioning the data in Table~\ref{tab:sample_data} by hour results in two segments for 2011-01-01, and partitioning the data by day results in a single segment. The time granularity to partition segments is a function of data volume and time range. A data set with timestamps spread over a year is better partitioned by day, and a data set with timestamps spread over a day is better partitioned by hour. Segments are uniquely identified by a data source identifer, the time interval of the data, and a version string that increases whenever a new segment is created. The version string indicates the freshness of segment data; segments with later versions have newer views of data (over some time range) than segments with older versions. This segment metadata is used by the system for concurrency control; read operations always access data in a particular time range from the segments with the latest version identifiers for that time range. Druid segments are stored in a column orientation. Given that Druid is best used for aggregating event streams (all data going into Druid must have a timestamp), the advantages storing aggregate information as columns rather than rows are well documented \cite{abadi2008column}. Column storage allows for more efficient CPU usage as only what is needed is actually loaded and scanned. In a row oriented data store, all columns associated with a row must be scanned as part of an aggregation. The additional scan time can introduce signficant performance degradations \cite{abadi2008column}. Druid has multiple column types to represent various data formats. Depending on the column type, different compression methods are used to reduce the cost of storing a column in memory and on disk. In the example given in Table~\ref{tab:sample_data}, the page, user, gender, and city columns only contain strings. Storing strings directly is unnecessarily costly and string columns can be dictionary encoded instead. Dictionary encoding is a common method to compress data and has been used in other data stores such as PowerDrill \cite{hall2012processing}. In the example in Table~\ref{tab:sample_data}, we can map each page to an unique integer identifier. \begin{verbatim} Justin Bieber -> 0 Ke$ha -> 1 \end{verbatim} This mapping allows us to represent the page column as an integer array where the array indices correspond to the rows of the original data set. For the page column, we can represent the unique pages as follows: \begin{verbatim} [0, 0, 1, 1] \end{verbatim} The resulting integer array lends itself very well to compression methods. Generic compression algorithms on top of encodings are extremely common in column-stores. Druid uses the LZF \cite{liblzf2013} compression algorithm. Similar compression methods can be applied to numeric columns. For example, the characters added and characters removed columns in Table~\ref{tab:sample_data} can also be expressed as individual arrays. \begin{verbatim} Characters Added -> [1800, 2912, 1953, 3194] Characters Removed -> [25, 42, 17, 170] \end{verbatim} In this case, we compress the raw values as opposed to their dictionary representations. \subsection{Indices for Filtering Data} In many real world OLAP workflows, queries are issued for the aggregated results of some set of metrics where some set of dimension specifications are met. An example query may be asked is: "How many Wikipedia edits were done by users in San Francisco who are also male?". This query is filtering the Wikipedia data set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension values. In many real world data sets, dimension columns contain strings and metric columns contain numeric values. Druid creates additional lookup indices for string columns such that only those rows that pertain to a particular query filter are ever scanned. Let us consider the page column in Table~\ref{tab:sample_data}. For each unique page in Table~\ref{tab:sample_data}, we can form some representation indicating in which table rows a particular page is seen. We can store this information in a binary array where the array indices represent our rows. If a particular page is seen in a certain row, that array index is marked as \texttt{1}. For example: \begin{verbatim} Justin Bieber -> rows [0, 1] -> [1][1][0][0] Ke$ha -> rows [2, 3] -> [0][0][1][1] \end{verbatim} \texttt{Justin Bieber} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values to row indices forms an inverted index \cite{tomasic1993performance}. To know which rows contain {\ttfamily Justin Bieber} or {\ttfamily Ke\$ha}, we can \texttt{OR} together the two arrays. \begin{verbatim} [0][1][0][1] OR [1][0][1][0] = [1][1][1][1] \end{verbatim} \begin{figure} \centering \includegraphics[width = 3in]{concise_plot} \caption{Integer array size versus Concise set size.} \label{fig:concise_plot} \end{figure} This approach of performing Boolean operations on large bitmap sets is commonly used in search engines. Bitmap compression algorithms are a well-defined area of research and often utilize run-length encoding. Popular algorithms include Byte-aligned Bitmap Code \cite{antoshenkov1995byte}, Word-Aligned Hybrid (WAH) code \cite{wu2006optimizing}, and Partitioned Word-Aligned Hybrid (PWAH) compression \cite{van2011memory}. Druid opted to use the Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by reducing the size of the compressed bitmaps by up to 50\%. Figure~\ref{fig:concise_plot} illustrates the number of bytes using Concise compression versus using an integer array. The results were generated on a cc2.8xlarge system with a single thread, 2G heap, 512m young gen, and a forced GC between each run. The data set is a single day’s worth of data collected from the Twitter garden hose \cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12 dimensions of varying cardinality. As an additional comparison, we also resorted the data set rows to maximize compression. In the unsorted case, the total Concise size was 53,451,144 bytes and the total integer array size was 127,248,520 bytes. Overall, Concise compressed sets are about 42\% smaller than integer arrays. In the sorted case, the total Concise compressed size was 43,832,884 bytes and the total integer array size was 127,248,520 bytes. What is interesting to note is that after sorting, global compression only increased minimally. The total Concise set size to total integer array size is 34\%. It is also interesting to note that as the cardinality of a dimension approaches the total number of rows in a data set, integer arrays require less space than Concise sets and become a better alternative. \subsection{Storage Engine} Druid’s persistence components allows for different storage engines to be plugged in, similar to Dynamo \cite{decandia2007dynamo}. These storage engines may store data in an entirely in-memory structure such as the JVM heap or in memory-mapped structures. The ability to swap storage engines allows for Druid to be configured depending on a particular application’s specifications. An in-memory storage engine may be operationally more expensive than a memory-mapped storage engine but could be a better alternative if performance is critical. By default, a memory-mapped storage engine is used. When using a memory-mapped storage engine, Druid relies on the operating system to page segments in and out of memory. Given that segments can only be scanned if they are loaded in memory, a memory-mapped storage engine allows recent segments to retain in memory whereas segments that are never queried are paged out. The main drawback with using the memory-mapped storage engine is when a query requires more segments to be paged into memory than a given node has capacity for. In this case, query performance will suffer from the cost of paging segments in and out of memory. \section{Query API} \label{sec:query-api} Druid has its own query language and accepts queries as POST requests. Broker, historical, and real-time nodes all share the same query API. The body of the POST request is a JSON object containing key-value pairs specifying various query parameters. A typical query will contain the data source name, the granularity of the result data, time range of interest, the type of request, and the metrics to aggregate over. The result will also be a JSON object containing the aggregated metrics over the time period. Most query types will also support a filter set. A filter set is a Boolean expression of dimension name and value pairs. Any number and combination of dimensions and values may be specified. When a filter set is provided, only the subset of the data that pertains to the filter set will be scanned. The ability to handle complex nested filter sets is what enables Druid to drill into data at any depth. The exact query syntax depends on the query type and the information requested. A sample count query over a week of data is shown below: \begin{verbatim} { "queryType" : "timeseries", "dataSource" : "wikipedia", "intervals" : "2013-01-01/2013-01-08", "filter" : { "type" : "selector", "dimension" : "page", "value" : "Ke$ha" }, "granularity" : "day", "aggregations" : [ { "type" : "count", "name" : "rows" } ] } \end{verbatim} The query shown above will return a count of the number of rows in the Wikipedia datasource from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: \begin{verbatim} [ { "timestamp": "2012-01-01T00:00:00.000Z", "result": { "rows": 393298 } }, { "timestamp": "2012-01-02T00:00:00.000Z", "result": { "rows": 382932 } }, ... { "timestamp": "2012-01-07T00:00:00.000Z", "result": { "rows": 1337 } } ] \end{verbatim} Druid supports many types of aggregations including double sums, long sums, minimums, maximums, and several others. Druid also supports complex aggregations such as cardinality estimation and approximate quantile estimation. The results of aggregations can be combined in mathematical expressions to form other aggregations. The query API is highly customizable and can be extended to filter and group results based on almost any arbitrary condition. It is beyond the scope of this paper to fully describe the query API but more information can be found online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}. We are also in the process of extending the Druid API to understand SQL. \section{Performance Benchmarks} \label{sec:benchmarks} To illustrate Druid's performance, we conducted a series of experiments that focused on measuring Druid's query and data ingestion capabilities. \subsection{Query Performance} To benchmark Druid query performance, we created a large test cluster with 6TB of uncompressed data, representing tens of billions of fact rows. The data set contained more than a dozen dimensions, with cardinalities ranging from the double digits to tens of millions. We computed four metrics for each row (counts, sums, and averages). The data was sharded first on timestamp and then on dimension values, creating thousands of shards roughly 8 million fact rows apiece. The cluster used in the benchmark consisted of 100 historical nodes, each with 16 cores, 60GB of RAM, 10 GigE Ethernet, and 1TB of disk space. Collectively, the cluster comprised of 1600 cores, 6TB or RAM, sufficiently fast Ethernet and more than enough disk space. SQL statements are included in Table~\ref{tab:sql_queries}. These queries are meant to represent some common queries that are made against Druid for typical data analysis workflows. Although Druid has its own query language, we choose to translate the queries into SQL to better describe what the queries are doing. Please note: \begin{itemize} \item The timestamp range of the queries encompassed all data. \item Each machine was a 16-core machine with 60GB RAM and 1TB of local disk. The machine was configured to only use 15 threads for processing queries. \item A memory-mapped storage engine was used (the machine was configured to memory map the data instead of loading it into the Java heap.) \end{itemize} \begin{table*} \centering \caption{Druid Queries} \label{tab:sql_queries} \begin{tabular}{| l | p{15cm} |} \hline \textbf{Query \#} & \textbf{Query} \\ \hline 1 & \texttt{SELECT count(*) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline 2 & \texttt{SELECT count(*), sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline 3 & \texttt{SELECT count(*), sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ?} \\ \hline 4 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline 5 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline 6 & \texttt{SELECT high\_card\_dimension, count(*) AS cnt, sum(metric1), sum(metric2), sum(metric3), sum(metric4) FROM \_table\_ WHERE timestamp $\geq$ ? AND timestamp < ? GROUP BY high\_card\_dimension ORDER BY cnt limit 100} \\ \hline \end{tabular} \end{table*} Figure~\ref{fig:cluster_scan_rate} shows the cluster scan rate and Figure~\ref{fig:core_scan_rate} shows the core scan rate. In Figure~\ref{fig:cluster_scan_rate} we also include projected linear scaling based on the results of the 25 core cluster. In particular, we observe diminishing marginal returns to performance in the size of the cluster. Under linear scaling, the first SQL count query (query 1) would have achieved a speed of 37 billion rows per second on our 75 node cluster. In fact, the speed was 26 billion rows per second. However, queries 2-6 maintain a near-linear speedup up to 50 nodes: the core scan rates in Figure~\ref{fig:core_scan_rate} remain nearly constant. The increase in speed of a parallel computing system is often limited by the time needed for the sequential operations of the system, in accordance with Amdahl's law \cite{amdahl1967validity}. \begin{figure} \centering \includegraphics[width = 2.8in]{cluster_scan_rate} \caption{Druid cluster scan rate with lines indicating linear scaling from 25 nodes.} \label{fig:cluster_scan_rate} \end{figure} \begin{figure} \centering \includegraphics[width = 2.8in]{core_scan_rate} \caption{Druid core scan rate.} \label{fig:core_scan_rate} \end{figure} The first query listed in Table~\ref{tab:sql_queries} is a simple count, achieving scan rates of 33M rows/second/core. We believe the 75 node cluster was actually overprovisioned for the test dataset, explaining the modest improvement over the 50 node cluster. Druid's concurrency model is based on shards: one thread will scan one shard. If the number of segments on a historical node modulo the number of cores is small (e.g. 17 segments and 15 cores), then many of the cores will be idle during the last round of the computation. When we include more aggregations we see performance degrade. This is because of the column-oriented storage format Druid employs. For the \texttt{count(*)} queries, Druid only has to check the timestamp column to satisfy the ``where'' clause. As we add metrics, it has to also load those metric values and scan over them, increasing the amount of memory scanned. \subsection{Data Ingestion Performance} To measure Druid's data latency latency, we spun up a single real-time node with the following configurations: \begin{itemize} \item JVM arguments: -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+HeapDumpOnOutOfMemoryError \item CPU: 2.3 GHz Intel Core i7 \end{itemize} Druid's data ingestion latency is heavily dependent on the complexity of the data set being ingested. The data complexity is determined by the number of dimensions in each event, the number of metrics in each event, and the types of aggregations we want to perform on those metrics. With the most basic data set (one that only has a timestamp column), our setup can ingest data at a rate of 800k events/sec/node, which is really just a measurement of how fast we can deserialize events. Real world data sets are never this simple. To simulate real-world ingestion rates, we created a data set with 5 dimensions and a single metric. 4 out of the 5 dimensions have a cardinality less than 100, and we varied the cardinality of the final dimension. The results of varying the cardinality of a dimension is shown in Figure~\ref{fig:throughput_vs_cardinality}. \begin{figure} \centering \includegraphics[width = 2.8in]{throughput_vs_cardinality} \caption{When we vary the cardinality of a single dimension, we can see monotonically decreasing throughput.} \label{fig:throughput_vs_cardinality} \end{figure} In Figure~\ref{fig:throughput_vs_num_dims}, we instead vary the number of dimensions in our data set. Each dimension has a cardinality less than 100. We can see a similar decline in ingestion throughput as the number of dimensions increases. \begin{figure} \centering \includegraphics[width = 2.8in]{throughput_vs_num_dims} \caption{Increasing the number of dimensions of our data set also leads to a decline in throughput.} \label{fig:throughput_vs_num_dims} \end{figure} Finally, keeping our number of dimensions constant at 5, with four dimensions having a cardinality in the 0-100 range and the final dimension having a cardinality of 10,000, we can see a similar decline in throughput when we increase the number of metrics/aggregators in the data set. We used random types of metrics/aggregators in this experiment, and they vary from longs, doubles, and other more complex types. The randomization introduces more noise in the results, leading to a graph that is not strictly decreasing. These results are shown in Figure~\ref{fig:throughput_vs_num_metrics}. For most real world data sets, the number of metrics tends to be less than the number of dimensions. Hence, we can see that introducing a few new metrics does not impact the ingestion latency as severely as in the other graphs. \begin{figure} \centering \includegraphics[width = 2.8in]{throughput_vs_num_metrics} \caption{Adding new metrics to a data set decreases ingestion latency. In most real world data sets, the number of metrics in a data set tends to be lower than the number of dimensions.} \label{fig:throughput_vs_num_metrics} \end{figure} \section{Related Work} \label{sec:related} Cattell \cite{cattell2011scalable} maintains a great summary about existing Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another great summary for streaming databases. Druid feature-wise sits somewhere between Google’s Dremel \cite{melnik2010dremel} and PowerDrill \cite{hall2012processing}. Druid has most of the features implemented in Dremel (Dremel handles arbitrary nested data structures while Druid only allows for a single level of array-based nesting) and many of the interesting compression algorithms mentioned in PowerDrill. Although Druid builds on many of the same principles as other distributed columnar data stores \cite{fink2012distributed}, many of these data stores are designed to be more generic key-value stores \cite{stonebraker2005c} and do not support computation directly in the storage layer. There are also other data stores designed for some of the same of the data warehousing issues that Druid is meant to solve. These systems include include in-memory databases such as SAP’s HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data stores lack Druid's low latency ingestion characteristics. Druid also has native analytical features baked in, similar to \cite{paraccel2013}, however, Druid allows system wide rolling software updates with no downtime. Druid's low latency data ingestion features share some similarities with Trident/Storm \cite{marz2013storm} and Streaming Spark \cite{zaharia2012discretized}, however, both systems are focused on stream processing whereas Druid is focused on ingestion and aggregation. Stream processors are great complements to Druid as a means of pre-processing the data before the data enters Druid. There are a class of systems that specialize in queries on top of cluster computing frameworks. Shark \cite{engle2012shark} is such a system for queries on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system focused on optimizing query performance on top of HDFS. Druid historical nodes download data locally and only work with native Druid indexes. We believe this setup allows for faster query latencies. Druid leverages a unique combination of algorithms in its architecture. Although we believe no other data store has the same set of functionality as Druid, some of Druid’s optimization techniques such as using inverted indices to perform fast filters are also used in other data stores \cite{macnicol2004sybase}. \section{Conclusions} \label{sec:conclusions} In this paper, we presented Druid, a distributed, column-oriented, real-time analytical data store. Druid is designed to power high performance applications and is optimized for low query latencies. Druid supports streaming data ingestion and is fault-tolerant. We discussed how Druid was able to scan 27 billion rows in a second. We summarized key architecture aspects such as the storage format, query language, and general execution. In the future, we plan to cover the different algorithms we’ve developed for Druid and how other systems may plug into Druid in greater detail. \balance \section{Acknowledgements} \label{sec:acknowledgements} Druid could not have been built without the help of many great engineers at Metamarkets and in the community. We want to thank everyone that has contributed to the Druid codebase for their invaluable support. % The following two commands are all you need in the % initial runs of your .tex file to % produce the bibliography for the citations in your paper. \bibliographystyle{abbrv} \bibliography{druid} % druid.bib is the name of the Bibliography in this case % You must have a proper ".bib" file % and remember to run: % latex bibtex latex latex % to resolve all references %Generated by bibtex from your ~.bib file. Run latex, %then bibtex, then latex twice (to resolve references). %APPENDIX is optional. % ****************** APPENDIX ************************************** % Example of an appendix; typically would start on a new page %pagebreak \end{document}