diff --git a/publications/whitepaper/druid.tex b/publications/whitepaper/druid.tex index 93a55faf023..1bce7339599 100644 --- a/publications/whitepaper/druid.tex +++ b/publications/whitepaper/druid.tex @@ -60,28 +60,28 @@ \maketitle -\begin{abstract} +\begin{abstract} Druid is an open source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}} data store designed for real-time exploratory analytics on large data sets. The system combines a column-oriented storage layout, a distributed, shared-nothing architecture, and an advanced indexing structure to allow for the arbitrary exploration of billion-row tables with sub-second latencies. In this paper, we describe Druid's architecture, and detail how it supports fast -aggregations, flexible filters, and low latency data ingestion. +aggregations, flexible filters, and low latency data ingestion. \end{abstract} % A category with the (minimum) three required fields \category{H.2.4}{Database Management}{Systems}[Distributed databases] % \category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures] -\keywords{distributed; real-time; fault-tolerant; analytics; column-oriented; OLAP} +\keywords{distributed; real-time; fault-tolerant; highly available; open source; analytics; column-oriented; OLAP} -\section{Introduction} +\section{Introduction} In recent years, the proliferation of internet technology has -created a surge in machine-generated events. Individually, these -events contain minimal useful information and are of low value. Given the +created a surge in machine-generated events. Individually, these +events contain minimal useful information and are of low value. Given the time and resources required to extract meaning from large collections of -events, many companies were willing to discard this data instead. Although +events, many companies were willing to discard this data instead. Although infrastructure has been built to handle event-based data (e.g. IBM's Netezza\cite{singh2011introduction}, HP's Vertica\cite{bear2012vertica}, and EMC's Greenplum\cite{miner2012unified}), they are largely sold at high price points @@ -89,36 +89,36 @@ and are only targeted towards those companies who can afford the offering. A few years ago, Google introduced MapReduce \cite{dean2008mapreduce} as their mechanism of leveraging commodity hardware to index the internet and analyze -logs. The Hadoop \cite{shvachko2010hadoop} project soon followed and was +logs. The Hadoop \cite{shvachko2010hadoop} project soon followed and was largely patterned after the insights that came out of the original MapReduce paper. Hadoop is currently deployed in many organizations to store and analyze -large amounts of log data. Hadoop has contributed much to helping companies +large amounts of log data. Hadoop has contributed much to helping companies convert their low-value event streams into high-value aggregates for a variety of applications such as business intelligence and A-B testing. -As with a lot of great systems, Hadoop has opened our eyes to a new space of -problems. Specifically, Hadoop excels at storing and providing access to large +As with many great systems, Hadoop has opened our eyes to a new space of +problems. Specifically, Hadoop excels at storing and providing access to large amounts of data, however, it does not make any performance guarantees around -how quickly that data can be accessed. Furthermore, although Hadoop is a +how quickly that data can be accessed. Furthermore, although Hadoop is a highly available system, performance degrades under heavy concurrent load. Lastly, while Hadoop works well for storing data, it is not optimized for ingesting data and making that data immediately readable. Early on in the development of the Metamarkets product, we ran into each of these issues and came to the realization that Hadoop is a great back-office, -batch processing, and data warehousing system. However, as a company that has +batch processing, and data warehousing system. However, as a company that has product-level guarantees around query performance and data availability in a highly concurrent environment (1000+ users), Hadoop wasn't going to meet our -needs. We explored different solutions in the space, and after +needs. We explored different solutions in the space, and after trying both Relational Database Management Systems and NoSQL architectures, we came to the conclusion that there was nothing in the open source world that could be fully leveraged for our requirements. We ended up creating Druid, an -open-source, distributed, column-oriented, real-time analytical data store. In +open source, distributed, column-oriented, real-time analytical data store. In many ways, Druid shares similarities with other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied}, interactive query systems \cite{melnik2010dremel}, main-memory databases \cite{farber2012sap}, as well as widely known distributed data stores -\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The +\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The distribution and query model also borrow ideas from current generation search infrastructure \cite{linkedin2013senseidb, apache2013solr, banon2013elasticsearch}. @@ -130,10 +130,10 @@ potential method of solving it. Druid is deployed in production at several technology companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}. The structure of the paper is as follows: we first describe the problem in -Section \ref{sec:problem-definition}. Next, we detail system architecture from +Section \ref{sec:problem-definition}. Next, we detail system architecture from the point of view of how data flows through the system in Section -\ref{sec:architecture}. We then discuss how and why data gets converted into a -binary format in Section \ref{sec:storage-format}. We briefly describe the +\ref{sec:architecture}. We then discuss how and why data gets converted into a +binary format in Section \ref{sec:storage-format}. We briefly describe the query API in Section \ref{sec:query-api} and present performance results in Section \ref{sec:benchmarks}. Lastly, we leave off with our lessons from running Druid in production in Section \ref{sec:production}, and related work @@ -161,7 +161,7 @@ Druid was originally designed to solve problems around ingesting and exploring large quantities of transactional events (log data). This form of timeseries data is commonly found in OLAP workflows and the nature of the data tends to be very append heavy. For example, consider the data shown in -Table~\ref{tab:sample_data}. Table~\ref{tab:sample_data} contains data for +Table~\ref{tab:sample_data}. Table~\ref{tab:sample_data} contains data for edits that have occurred on Wikipedia. Each time a user edits a page in Wikipedia, an event is generated that contains metadata about the edit. This metadata is comprised of 3 distinct components. First, there is a timestamp @@ -170,7 +170,7 @@ columns indicating various attributes about the edit such as the page that was edited, the user who made the edit, and the location of the user. Finally, there are a set of metric columns that contain values (usually numeric) that can be aggregated, such as the number of characters added or removed in an -edit. +edit. Our goal is to rapidly compute drill-downs and aggregates over this data. We want to answer questions like “How many edits were made on the page Justin @@ -184,25 +184,25 @@ Relational Database Management Systems (RDBMS) and NoSQL key/value stores were unable to provide a low latency data ingestion and query platform for interactive applications \cite{tschetter2011druid}. In the early days of Metamarkets, we were focused on building a hosted dashboard that would allow -users to arbitrarily explore and visualize event streams. The data store +users to arbitrarily explore and visualize event streams. The data store powering the dashboard needed to return queries fast enough that the data visualizations built on top of it could provide users with an interactive -experience. +experience. In addition to the query latency needs, the system had to be multi-tenant and highly available. The Metamarkets product is used in a highly concurrent environment. Downtime is costly and many businesses cannot afford to wait if a system is unavailable in the face of software upgrades or network failure. Downtime for startups, who often lack proper internal operations management, can -determine business success or failure. +determine business success or failure. -Finally, another key problem that Metamarkets faced in its early days was to +Finally, another challenge that Metamarkets faced in its early days was to allow users and alerting systems to be able to make business decisions in ``real-time". The time from when an event is created to when that event is queryable determines how fast interested parties are able to react to potentially catastrophic situations in their systems. Popular open source data warehousing systems such as Hadoop were unable to provide the sub-second data -ingestion latencies we required. +ingestion latencies we required. The problems of data exploration, ingestion, and availability span multiple industries. Since Druid was open sourced in October 2012, it been deployed as a @@ -222,7 +222,7 @@ To solve complex data analysis problems, the different node types come together to form a fully working system. The name Druid comes from the Druid class in many role-playing games: it is a shape-shifter, capable of taking on many different forms to fulfill various different roles in a group. The composition -of and flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. +of and flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. \begin{figure*} \centering @@ -240,12 +240,12 @@ periodically hand off immutable batches of events they have collected over this small time range to other nodes in the Druid cluster that are specialized in dealing with batches of immutable events. Real-time nodes leverage Zookeeper \cite{hunt2010zookeeper} for coordination with the rest of the Druid cluster. -The nodes announce their online state and the data they are serving in -Zookeeper. +The nodes announce their online state and the data they serve in +Zookeeper. Real-time nodes maintain an in-memory index buffer for all incoming events. -These indexes are incrementally populated as new events are ingested and the -indexes are also directly queryable. Druid behaves as a row store +These indexes are incrementally populated as events are ingested and the +indexes are also directly queryable. Druid behaves as a row store for queries on events that exist in this JVM heap-based buffer. To avoid heap overflow problems, real-time nodes persist their in-memory indexes to disk either periodically or after some maximum row limit is reached. This persist @@ -255,10 +255,10 @@ index is immutable and real-time nodes load persisted indexes into off-heap memory such that they can still be queried. This process is described in detail in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}. -\begin{figure} -\centering +\begin{figure} +\centering \includegraphics[width = 2.6in]{realtime_flow} -\caption{Real-time nodes buffer events to an in-memory index, which is +\caption{Real-time nodes buffer events to an in-memory index, which is regularly persisted to disk. On a periodic basis, persisted indexes are then merged together before getting handed off. Queries will hit both the in-memory and persisted indexes. @@ -269,7 +269,7 @@ Queries will hit both the in-memory and persisted indexes. On a periodic basis, each real-time node will schedule a background task that searches for all locally persisted indexes. The task merges these indexes together and builds an immutable block of data that contains all the events -that have ingested by a real-time node for some span of time. We refer to this +that have been ingested by a real-time node for some span of time. We refer to this block of data as a ``segment". During the handoff stage, a real-time node uploads this segment to a permanent backup storage, typically a distributed file system such as S3 \cite{decandia2007dynamo} or HDFS @@ -280,20 +280,20 @@ of the processes. Figure~\ref{fig:realtime_timeline} illustrates the operations of a real-time node. The node starts at 13:37 and will only accept events for the current hour or the next hour. When events are ingested, the node announces that it is -serving a segment of data for an interval from 13:00 to 14:00. Every 10 +serving a segment of data for an interval from 13:00 to 14:00. Every 10 minutes (the persist period is configurable), the node will flush and persist -its in-memory buffer to disk. Near the end of the hour, the node will likely +its in-memory buffer to disk. Near the end of the hour, the node will likely see events for 14:00 to 15:00. When this occurs, the node prepares to serve data for the next hour and creates a new in-memory index. The node then -announces that it is also serving a segment from 14:00 to 15:00. The node does +announces that it is also serving a segment from 14:00 to 15:00. The node does not immediately merge persisted indexes from 13:00 to 14:00, instead it waits for a configurable window period for straggling events from 13:00 to 14:00 to arrive. This window period minimizes the risk of data loss from delays in event delivery. At the end of the window period, the node merges all persisted indexes from 13:00 to 14:00 into a single immutable segment and hands the -segment off. Once this segment is loaded and queryable somewhere else in the +segment off. Once this segment is loaded and queryable somewhere else in the Druid cluster, the real-time node flushes all information about the data it -collected for 13:00 to 14:00 and unannounces it is serving this data. +collected for 13:00 to 14:00 and unannounces it is serving this data. \begin{figure*} \centering @@ -306,7 +306,7 @@ real-time node operations are configurable.} \subsubsection{Availability and Scalability} Real-time nodes are a consumer of data and require a corresponding producer to -provide the data stream. Commonly, for data durability purposes, a message +provide the data stream. Commonly, for data durability purposes, a message bus such as Kafka \cite{kreps2011kafka} sits between the producer and the real-time node as shown in Figure~\ref{fig:realtime_pipeline}. Real-time nodes ingest data by reading events from the message bus. The time from event @@ -321,7 +321,7 @@ milliseconds. \end{figure} The purpose of the message bus in Figure~\ref{fig:realtime_pipeline} is -two-fold. First, the message bus acts as a buffer for incoming events. A +two-fold. First, the message bus acts as a buffer for incoming events. A message bus such as Kafka maintains positional offsets indicating how far a consumer (a real-time node) has read in an event stream. Consumers can programmatically update these offsets. Real-time nodes update this offset each @@ -337,7 +337,7 @@ multiple real-time nodes can read events. Multiple real-time nodes can ingest the same set of events from the bus, creating a replication of events. In a scenario where a node completely fails and loses disk, replicated streams ensure that no data is lost. A single ingestion endpoint also allows for data -streams for be partitioned such that multiple real-time nodes each ingest a +streams to be partitioned such that multiple real-time nodes each ingest a portion of a stream. This allows additional real-time nodes to be seamlessly added. In practice, this model has allowed one of the largest production Druid clusters to be able to consume raw data at approximately 500 MB/s (150,000 @@ -347,22 +347,22 @@ events/s or 2 TB/hour). Historical nodes encapsulate the functionality to load and serve the immutable blocks of data (segments) created by real-time nodes. In many real-world workflows, most of the data loaded in a Druid cluster is immutable and hence, -historical nodes are typically the main workers of a Druid cluster. Historical +historical nodes are typically the main workers of a Druid cluster. Historical nodes follow a shared-nothing architecture and there is no single point of contention among the nodes. The nodes have no knowledge of one another and are operationally simple; they only know how to load, drop, and serve immutable -segments. +segments. Similar to real-time nodes, historical nodes announce their online state and the data they are serving in Zookeeper. Instructions to load and drop segments are sent over Zookeeper and contain information about where the segment is -located in deep storage and how to decompress and process the segment. Before +located in deep storage and how to decompress and process the segment. Before a historical node downloads a particular segment from deep storage, it first checks a local cache that maintains information about what segments already -exist on the node. If information about a segment is not present in the cache, +exist on the node. If information about a segment is not present in the cache, the historical node will proceed to download the segment from deep storage. This process is shown in Figure~\ref{fig:historical_download}. Once processing -is complete, the segment is announced in Zookeeper. At this point, the segment +is complete, the segment is announced in Zookeeper. At this point, the segment is queryable. The local cache also allows for historical nodes to be quickly updated and restarted. On startup, the node examines its cache and immediately serves whatever data it finds. @@ -378,7 +378,7 @@ Historical nodes can support read consistency because they only deal with immutable data. Immutable data blocks also enable a simple parallelization model: historical nodes can concurrently scan and aggregate immutable blocks without blocking. - + \subsubsection{Tiers} \label{sec:tiers} Historical nodes can be grouped in different tiers, where all nodes in a @@ -394,11 +394,11 @@ can also be created with much less powerful backing hardware. The \subsubsection{Availability} Historical nodes depend on Zookeeper for segment load and unload instructions. -If Zookeeper becomes unavailable, historical nodes are no longer able to serve -new data and drop outdated data, however, because the queries are served over -HTTP, historical nodes are still be able to respond to query requests for +Should Zookeeper become unavailable, historical nodes are no longer able to serve +new data or drop outdated data, however, because the queries are served over +HTTP, historical nodes are still able to respond to query requests for the data they are currently serving. This means that Zookeeper outages do not -impact current data availability on historical nodes. +impact current data availability on historical nodes. \subsection{Broker Nodes} Broker nodes act as query routers to historical and real-time nodes. Broker @@ -406,7 +406,7 @@ nodes understand the metadata published in Zookeeper about what segments are queryable and where those segments are located. Broker nodes route incoming queries such that the queries hit the right historical or real-time nodes. Broker nodes also merge partial results from historical and real-time nodes before returning -a final consolidated result to the caller. +a final consolidated result to the caller. \subsubsection{Caching} \label{sec:caching} @@ -417,11 +417,11 @@ distributed key/value store such as Memcached first maps the query to a set of segments. Results for certain segments may already exist in the cache and there is no need to recompute them. For any results that do not exist in the cache, the broker node will forward the query -to the correct historical and real-time nodes. Once historical nodes return +to the correct historical and real-time nodes. Once historical nodes return their results, the broker will cache these results on a per segment basis for future use. This process is illustrated in Figure~\ref{fig:caching}. Real-time data is never cached and hence requests for real-time data will always be -forwarded to real-time nodes. Real-time data is perpetually changing and +forwarded to real-time nodes. Real-time data is perpetually changing and caching the results is unreliable. \begin{figure*} @@ -436,20 +436,20 @@ that all historical nodes fail, it is still possible to query results if those results already exist in the cache. \subsubsection{Availability} -In the event of a total Zookeeper outage, data is still queryable. If broker +In the event of a total Zookeeper outage, data is still queryable. If broker nodes are unable to communicate to Zookeeper, they use their last known view of the cluster and continue to forward queries to real-time and historical nodes. Broker nodes make the assumption that the structure of the cluster is the same as it was before the outage. In practice, this availability model has allowed our Druid cluster to continue serving queries for a significant period of time while we -diagnosed Zookeeper outages. +diagnosed Zookeeper outages. \subsection{Coordinator Nodes} Druid coordinator nodes are primarily in charge of data management and distribution on historical nodes. The coordinator nodes tell historical nodes to load new data, drop outdated data, replicate data, and move data to load balance. Druid uses a multi-version concurrency control swapping protocol for -managing immutable segments in order to maintain stable views. If any +managing immutable segments in order to maintain stable views. If any immutable segment contains data that is wholly obsoleted by newer segments, the outdated segment is dropped from the cluster. Coordinator nodes undergo a leader-election process that determines a single node that runs the coordinator @@ -472,7 +472,7 @@ Rules govern how historical segments are loaded and dropped from the cluster. Rules indicate how segments should be assigned to different historical node tiers and how many replicates of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. Rules -are usually set for a period of time. For example, a user may use rules to +are usually set for a period of time. For example, a user may use rules to load the most recent one month's worth of segments into a ``hot" cluster, the most recent one year's worth of segments into a ``cold" cluster, and drop any segments that are older. @@ -488,19 +488,19 @@ of segments. Since each historical node has limited resources, segments must be distributed among the cluster to ensure that the cluster load is not too imbalanced. Determining optimal load distribution requires some knowledge about query patterns and speeds. Typically, queries cover recent segments spanning -contiguous time intervals for a single data source. On average, queries that +contiguous time intervals for a single data source. On average, queries that access smaller segments are faster. These query patterns suggest replicating recent historical segments at a higher rate, spreading out large segments that are close in time to different -historical nodes, and co-locating segments from different data sources. To +historical nodes, and co-locating segments from different data sources. To optimally distribute and balance segments among the cluster, we developed a cost-based optimization procedure that takes into account the segment data source, recency, and size. The exact details of the algorithm are beyond the scope of this paper and may be discussed in future literature. \subsubsection{Replication} -Coordinator nodes may tell different historical nodes to load copies of the +Coordinator nodes may tell different historical nodes to load a copy of the same segment. The number of replicates in each tier of the historical compute cluster is fully configurable. Setups that require high levels of fault tolerance can be configured to have a high number of replicas. Replicated @@ -513,7 +513,7 @@ cluster. Over the last two years, we have never taken downtime in our Druid cluster for software upgrades. \subsubsection{Availability} -Druid coordinator nodes have two external dependencies: Zookeeper and MySQL. +Druid coordinator nodes have Zookeeper and MySQL as external dependencies. Coordinator nodes rely on Zookeeper to determine what historical nodes already exist in the cluster. If Zookeeper becomes unavailable, the coordinator will no longer be able to send instructions to assign, balance, and drop segments. @@ -523,7 +523,7 @@ The design principle for responding to MySQL and Zookeeper failures is the same: if an external dependency responsible for coordination fails, the cluster maintains the status quo. Druid uses MySQL to store operational management information and segment metadata information about what segments should exist -in the cluster. If MySQL goes down, this information becomes unavailable to +in the cluster. If MySQL goes down, this information becomes unavailable to coordinator nodes. However, this does not mean data itself is unavailable. If coordinator nodes cannot communicate to MySQL, they will cease to assign new segments and drop outdated ones. Broker, historical, and real-time nodes are still @@ -537,7 +537,7 @@ is typically 5--10 million rows. Formally, we define a segment as a collection of rows of data that span some period of time. Segments represent the fundamental storage unit in Druid and replication and distribution are done at a segment level. - + Druid always requires a timestamp column as a method of simplifying data distribution policies, data retention policies, and first-level query pruning. Druid partitions its data sources into well-defined time intervals, typically @@ -549,16 +549,16 @@ with timestamps spread over a day is better partitioned by hour. Segments are uniquely identified by a data source identifer, the time interval of the data, and a version string that increases whenever a new segment is -created. The version string indicates the freshness of segment data; segments +created. The version string indicates the freshness of segment data; segments with later versions have newer views of data (over some time range) than -segments with older versions. This segment metadata is used by the system for +segments with older versions. This segment metadata is used by the system for concurrency control; read operations always access data in a particular time range from the segments with the latest version identifiers for that time range. Druid segments are stored in a column orientation. Given that Druid is best used for aggregating event streams (all data going into Druid must have a -timestamp), the advantages storing aggregate information as columns rather than +timestamp), the advantages of storing aggregate information as columns rather than rows are well documented \cite{abadi2008column}. Column storage allows for more efficient CPU usage as only what is needed is actually loaded and scanned. In a row oriented data store, all columns associated with a row must be scanned as @@ -573,7 +573,7 @@ contain strings. Storing strings directly is unnecessarily costly and string columns can be dictionary encoded instead. Dictionary encoding is a common method to compress data and has been used in other data stores such as PowerDrill \cite{hall2012processing}. In the example in -Table~\ref{tab:sample_data}, we can map each page to an unique integer +Table~\ref{tab:sample_data}, we can map each page to a unique integer identifier. {\small\begin{verbatim} Justin Bieber -> 0 @@ -607,7 +607,7 @@ representations. In many real world OLAP workflows, queries are issued for the aggregated results of some set of metrics where some set of dimension specifications are met. An example query is: ``How many Wikipedia edits were done by users in -San Francisco who are also male?". This query is filtering the Wikipedia data +San Francisco who are also male?" This query is filtering the Wikipedia data set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension values. In many real world data sets, dimension columns contain strings and metric columns contain numeric values. Druid creates additional lookup @@ -657,9 +657,9 @@ resorted the data set rows to maximize compression. In the unsorted case, the total Concise size was 53,451,144 bytes and the total integer array size was 127,248,520 bytes. Overall, Concise compressed sets are -about 42\% smaller than integer arrays. In the sorted case, the total Concise +about 42\% smaller than integer arrays. In the sorted case, the total Concise compressed size was 43,832,884 bytes and the total integer array size was -127,248,520 bytes. What is interesting to note is that after sorting, global +127,248,520 bytes. What is interesting to note is that after sorting, global compression only increased minimally. \subsection{Storage Engine} @@ -673,7 +673,7 @@ memory-mapped storage engine but could be a better alternative if performance is critical. By default, a memory-mapped storage engine is used. When using a memory-mapped storage engine, Druid relies on the operating system -to page segments in and out of memory. Given that segments can only be scanned +to page segments in and out of memory. Given that segments can only be scanned if they are loaded in memory, a memory-mapped storage engine allows recent segments to retain in memory whereas segments that are never queried are paged out. The main drawback with using the memory-mapped storage engine is when a @@ -694,8 +694,8 @@ JSON object containing the aggregated metrics over the time period. Most query types will also support a filter set. A filter set is a Boolean expression of dimension name and value pairs. Any number and combination of -dimensions and values may be specified. When a filter set is provided, only -the subset of the data that pertains to the filter set will be scanned. The +dimensions and values may be specified. When a filter set is provided, only +the subset of the data that pertains to the filter set will be scanned. The ability to handle complex nested filter sets is what enables Druid to drill into data at any depth. @@ -716,7 +716,7 @@ A sample count query over a week of data is as follows: } \end{verbatim}} The query shown above will return a count of the number of rows in the Wikipedia data source -from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is +from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form: {\scriptsize\begin{verbatim} [ { @@ -734,19 +734,19 @@ equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array } ] \end{verbatim}} -Druid supports many types of aggregations including double sums, long sums, +Druid supports many types of aggregations including sums on floating-point and integer types, minimums, maximums, and complex aggregations such as cardinality estimation and -approximate quantile estimation. The results of aggregations can be combined +approximate quantile estimation. The results of aggregations can be combined in mathematical expressions to form other aggregations. It is beyond the scope of this paper to fully describe the query API but more information can be found online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}. -As of this writing, a join query for Druid is not yet implemented. This has +As of this writing, a join query for Druid is not yet implemented. This has been a function of engineering resource allocation and use case decisions more -than a decision driven by technical merit. Indeed, Druid's storage format +than a decision driven by technical merit. Indeed, Druid's storage format would allow for the implementation of joins (there is no loss of fidelity for columns included as dimensions) and the implementation of them has been a -conversation that we have every few months. To date, we have made the choice +conversation that we have every few months. To date, we have made the choice that the implementation cost is not worth the investment for our organization. The reasons for this decision are generally two-fold. @@ -756,30 +756,29 @@ The reasons for this decision are generally two-fold. \end{enumerate} A join query is essentially the merging of two or more streams of data based on -a shared set of keys. The primary high-level strategies for join queries the -authors are aware of are a hash-based strategy or a sorted-merge strategy. The +a shared set of keys. The primary high-level strategies for join queries we +are aware of are a hash-based strategy or a sorted-merge strategy. The hash-based strategy requires that all but one data set be available as something that looks like a hash table, a lookup operation is then performed on -this hash table for every row in the ``primary" stream. The sorted-merge +this hash table for every row in the ``primary" stream. The sorted-merge strategy assumes that each stream is sorted by the join key and thus allows for -the incremental joining of the streams. Each of these strategies, however, +the incremental joining of the streams. Each of these strategies, however, requires the materialization of some number of the streams either in sorted -order or in a hash table form. +order or in a hash table form. When all sides of the join are significantly large tables (> 1 billion records), materializing the pre-join streams requires complex distributed memory -management. The complexity of the memory management is only amplified by +management. The complexity of the memory management is only amplified by the fact that we are targeting highly concurrent, multitenant workloads. -This is, as far as the authors are aware, an active academic research -problem that we would be more than willing to engage with the academic -community to help resolving in a scalable manner. +This is, as far as we are aware, an active academic research +problem that we would be willing to help resolve in a scalable manner. \section{Performance} \label{sec:benchmarks} Druid runs in production at several organizations, and to demonstrate its performance, we have chosen to share some real world numbers for the main production -cluster running at Metamarkets in early 2014. For comparison with other databases +cluster running at Metamarkets as of early 2014. For comparison with other databases we also include results from synthetic workloads on TPC-H data. \subsection{Query Performance in Production} @@ -789,7 +788,7 @@ based on a given metric is much more expensive than a simple count over a time range. To showcase the average query latencies in a production Druid cluster, we selected 8 of our most queried data sources, described in Table~\ref{tab:datasources}. -Approximately 30\% of the queries are standard +Approximately 30\% of queries are standard aggregates involving different types of metrics and filters, 60\% of queries are ordered group bys over one or more dimensions with aggregates, and 10\% of queries are search queries and metadata retrieval queries. The number of @@ -827,7 +826,7 @@ approximately 10TB of segments loaded. Collectively, there are about 50 billion Druid rows in this tier. Results for every data source are not shown. -\item The hot tier uses Intel Xeon E5-2670 processors and consists of 1302 processing +\item The hot tier uses Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 processors and consists of 1302 processing threads and 672 total cores (hyperthreaded). \item A memory-mapped storage engine was used (the machine was configured to @@ -838,28 +837,28 @@ Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various data sources, average query latency is approximately 550 milliseconds, with 90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and -99\% of queries returning in less than 10 seconds. Occasionally we observe +99\% of queries returning in less than 10 seconds. Occasionally we observe spikes in latency, as observed on February 19, where network issues on the Memcached instances were compounded by very high query load on one of our largest data sources. \begin{figure} -\centering +\centering \includegraphics[width = 2.3in]{avg_query_latency} \includegraphics[width = 2.3in]{query_percentiles} -\caption{Query latencies of production data sources.} +\caption{Query latencies of production data sources.} \label{fig:query_latency} \end{figure} \begin{figure} -\centering +\centering \includegraphics[width = 2.8in]{queries_per_min} -\caption{Queries per minute of production data sources.} +\caption{Queries per minute of production data sources.} \label{fig:queries_per_min} \end{figure} \subsection{Query Benchmarks on TPC-H Data} -We also present Druid benchmarks on TPC-H data. +We also present Druid benchmarks on TPC-H data. Most TPC-H queries do not directly apply to Druid, so we selected queries more typical of Druid's workload to demonstrate query performance. As a comparison, we also provide the results of the same queries using MySQL using the @@ -871,8 +870,8 @@ open source column store because we were not confident we could correctly tune it for optimal performance. Our Druid setup used Amazon EC2 -\texttt{m3.2xlarge} (Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for -historical nodes and \texttt{c3.2xlarge} (Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz) instances for broker +\texttt{m3.2xlarge} instance types (Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2680 v2 @ 2.80GHz) for +historical nodes and \texttt{c3.2xlarge} instances (Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 v2 @ 2.50GHz) for broker nodes. Our MySQL setup was an Amazon RDS instance that ran on the same \texttt{m3.2xlarge} instance type. The results for the 1 GB TPC-H data set are shown @@ -918,7 +917,7 @@ well. To showcase Druid's data ingestion latency, we selected several production datasources of varying dimensions, metrics, and event volumes. Our production ingestion setup consists of 6 nodes, totalling 360GB of RAM and 96 cores -(12 x Intel Xeon E5-2670). +(12 x Intel\textsuperscript\textregistered Xeon\textsuperscript\textregistered E5-2670). Note that in this setup, several other data sources were being ingested and many other Druid related ingestion tasks were running concurrently on the machines. @@ -931,7 +930,7 @@ aggregations we want to perform on those metrics. With the most basic data set 800,000 events/second/core, which is really just a measurement of how fast we can deserialize events. Real world data sets are never this simple. Table~\ref{tab:ingest_datasources} shows a selection of data sources and their -characteristics. +characteristics. \begin{table} \centering @@ -974,9 +973,9 @@ running an Amazon \texttt{cc2.8xlarge} instance. The latency measurements we presented are sufficient to address the stated problems of interactivity. We would prefer the variability in the latencies to -be less. It is still very possible to decrease latencies by adding +be less. It is still possible to decrease latencies by adding additional hardware, but we have not chosen to do so because infrastructure -costs are still a consideration to us. +costs are still a consideration for us. \section{Druid in Production}\label{sec:production} Over the last few years, we have gained tremendous knowledge about handling @@ -988,8 +987,8 @@ explore use case, the number of queries issued by a single user are much higher than in the reporting use case. Exploratory queries often involve progressively adding filters for the same time range to narrow down results. Users tend to explore short time intervals of recent data. In the generate report use case, -users query for much longer data intervals, but users also already know the -queries they want to issue. +users query for much longer data intervals, but those queries are generally few +and pre-determined. \paragraph{Multitenancy} Expensive concurrent queries can be problematic in a multitenant @@ -1000,26 +999,26 @@ to address these issues. Each historical node is able to prioritize which segments it needs to scan. Proper query planning is critical for production workloads. Thankfully, queries for a significant amount of data tend to be for reporting use cases and can be deprioritized. Users do not expect the same level of -interactivity in this use case as when they are exploring data. +interactivity in this use case as when they are exploring data. \paragraph{Node failures} Single node failures are common in distributed environments, but many nodes failing at once are not. If historical nodes completely fail and do not -recover, their segments need to reassigned, which means we need excess cluster +recover, their segments need to be reassigned, which means we need excess cluster capacity to load this data. The amount of additional capacity to have at any time contributes to the cost of running a cluster. From our experiences, it is extremely rare to see more than 2 nodes completely fail at once and hence, we leave enough capacity in our cluster to completely reassign the data from 2 -historical nodes. +historical nodes. \paragraph{Data Center Outages} Complete cluster failures are possible, but extremely rare. If Druid is only deployed in a single data center, it is possible for the entire data center to fail. In such cases, new machines need to be provisioned. As long as -deep storage is still available, cluster recovery time is network bound as +deep storage is still available, cluster recovery time is network bound, as historical nodes simply need to redownload every segment from deep storage. We -have experienced such failures in the past, and the recovery time was around -several hours in the AWS ecosystem for several TBs of data. +have experienced such failures in the past, and the recovery time was +several hours in the Amazon AWS ecosystem for several terabytes of data. \subsection{Operational Monitoring} Proper monitoring is critical to run a large scale distributed cluster. @@ -1035,20 +1034,20 @@ performance and stability of the production cluster. This dedicated metrics cluster has allowed us to find numerous production problems, such as gradual query speed degregations, less than optimally tuned hardware, and various other system bottlenecks. We also use a metrics cluster to analyze what queries are -made in production and what users are most interested in. +made in production and what aspects of the data users are most interested in. \subsection{Pairing Druid with a Stream Processor} -At the time of writing, Druid can only understand fully denormalized data +Currently, Druid can only understand fully denormalized data streams. In order to provide full business logic in production, Druid can be paired with a stream processor such as Apache Storm \cite{marz2013storm}. A Storm topology consumes events from a data stream, retains only those that are “on-time”, and applies any relevant business logic. This could range from -simple transformations, such as id to name lookups, up to complex operations +simple transformations, such as id to name lookups, to complex operations such as multi-stream joins. The Storm topology forwards the processed event stream to Druid in real-time. Storm handles the streaming data processing work, and Druid is used for responding to queries for both real-time and -historical data. +historical data. \subsection{Multiple Data Center Distribution} Large scale production outages may not only affect single nodes, but entire @@ -1058,13 +1057,13 @@ exactly replicated across historical nodes in multiple data centers. Similarily, query preference can be assigned to different tiers. It is possible to have nodes in one data center act as a primary cluster (and receive all queries) and have a redundant cluster in another data center. Such a setup may -be desired if one data center is situated much closer to users. +be desired if one data center is situated much closer to users. \section{Related Work} \label{sec:related} Cattell \cite{cattell2011scalable} maintains a great summary about existing Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another -great summary for streaming databases. Druid feature-wise sits somewhere +great summary for streaming databases. Druid feature-wise sits somewhere between Google’s Dremel \cite{melnik2010dremel} and PowerDrill \cite{hall2012processing}. Druid has most of the features implemented in Dremel (Dremel handles arbitrary nested data structures while Druid only allows for a @@ -1074,15 +1073,15 @@ algorithms mentioned in PowerDrill. Although Druid builds on many of the same principles as other distributed columnar data stores \cite{fink2012distributed}, many of these data stores are designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not -support computation directly in the storage layer. There are also other data -stores designed for some of the same of the data warehousing issues that Druid -is meant to solve. These systems include include in-memory databases such as +support computation directly in the storage layer. There are also other data +stores designed for some of the same data warehousing issues that Druid +is meant to solve. These systems include in-memory databases such as SAP’s HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data stores lack Druid's low latency ingestion characteristics. Druid also has -native analytical features baked in, similar to \cite{paraccel2013}, however, -Druid allows system wide rolling software updates with no downtime. +native analytical features baked in, similar to ParAccel \cite{paraccel2013}, however, +Druid allows system wide rolling software updates with no downtime. -Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has +Druid is similiar to C-Store \cite{stonebraker2005c} and LazyBase \cite{cipar2012lazybase} in that it has two subsystems, a read-optimized subsystem in the historical nodes and a write-optimized subsystem in real-time nodes. Real-time nodes are designed to ingest a high volume of append heavy data, and do not support data updates. @@ -1090,14 +1089,14 @@ Unlike the two aforementioned systems, Druid is meant for OLAP transactions and not OLTP transactions. Druid's low latency data ingestion features share some similarities with -Trident/Storm \cite{marz2013storm} and Streaming Spark +Trident/Storm \cite{marz2013storm} and Spark Streaming \cite{zaharia2012discretized}, however, both systems are focused on stream processing whereas Druid is focused on ingestion and aggregation. Stream processors are great complements to Druid as a means of pre-processing the data before the data enters Druid. There are a class of systems that specialize in queries on top of cluster -computing frameworks. Shark \cite{engle2012shark} is such a system for queries +computing frameworks. Shark \cite{engle2012shark} is such a system for queries on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system focused on optimizing query performance on top of HDFS. Druid historical nodes download data locally and only work with native Druid indexes. We believe this @@ -1111,7 +1110,7 @@ stores \cite{macnicol2004sybase}. \section{Conclusions} \label{sec:conclusions} -In this paper, we presented Druid, a distributed, column-oriented, real-time +In this paper we presented Druid, a distributed, column-oriented, real-time analytical data store. Druid is designed to power high performance applications and is optimized for low query latencies. Druid supports streaming data ingestion and is fault-tolerant. We discussed Druid benchmarks and @@ -1123,8 +1122,8 @@ as the storage format, query language, and general execution. \section{Acknowledgements} \label{sec:acknowledgements} Druid could not have been built without the help of many great engineers at -Metamarkets and in the community. We want to thank everyone that has -contributed to the Druid codebase for their invaluable support. +Metamarkets and in the community. We want to thank everyone that has +contributed to the Druid codebase for their invaluable support. % The following two commands are all you need in the % initial runs of your .tex file to @@ -1136,7 +1135,7 @@ contributed to the Druid codebase for their invaluable support. % latex bibtex latex latex % to resolve all references -%Generated by bibtex from your ~.bib file. Run latex, +%Generated by bibtex from your ~.bib file. Run latex, %then bibtex, then latex twice (to resolve references). %APPENDIX is optional.