clean up examples, finish paper

This commit is contained in:
fjy 2014-03-13 18:52:08 -07:00
parent f00ffe4789
commit b4f1591260
14 changed files with 168 additions and 42 deletions

View File

@ -152,10 +152,7 @@ The indexing service can also run real-time tasks. These tasks effectively trans
"intermediatePersistPeriod": "PT10m"
},
"windowPeriod": "PT10m",
"segmentGranularity": "hour",
"rejectionPolicy": {
"type": "messageTime"
}
"segmentGranularity": "hour"
}
```

View File

@ -160,13 +160,15 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "none"
"type": "test"
}
}
}
]
```
Note: This config uses a "test" rejection policy which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
3. Let's copy and paste some data into the Kafka console producer
```json

View File

@ -251,6 +251,9 @@ druid.publish.type=noop
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
```
Next Steps

View File

@ -53,7 +53,7 @@
"segmentGranularity": "hour",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
"rejectionPolicy": {
"type": "messageTime"
"type": "test"
}
}
}

View File

@ -43,6 +43,6 @@
"windowPeriod" : "PT5m",
"segmentGranularity":"hour",
"basePersistDirectory" : "/tmp/realtime/basePersist",
"rejectionPolicy": { "type": "messageTime" }
"rejectionPolicy": { "type": "test" }
}
}]

View File

@ -16,3 +16,5 @@ druid.publish.type=noop
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]

Binary file not shown.

View File

@ -122,7 +122,6 @@ edit.
\begin{table*}
\centering
\label{tab:sample_data}
\begin{tabular}{| l | l | l | l | l | l | l | l |}
\hline
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
@ -132,6 +131,7 @@ edit.
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
\end{tabular}
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
\label{tab:sample_data}
\end{table*}
Our goal is to rapidly compute drill-downs and aggregates over this data. We
@ -160,7 +160,7 @@ determine business success or failure.
Finally, another key problem that Metamarkets faced in its early days was to
allow users and alerting systems to be able to make business decisions in
"real-time". The time from when an event is created to when that
``real-time". The time from when an event is created to when that
event is queryable determines how fast users and systems are able to react to
potentially catastrophic occurrences in their systems. Popular open source data
warehousing systems such as Hadoop were unable to provide the sub-second data ingestion
@ -177,7 +177,7 @@ A Druid cluster consists of different types of nodes and each node type is
designed to perform a specific set of things. We believe this design separates
concerns and simplifies the complexity of the system. The different node types
operate fairly independent of each other and there is minimal interaction
between them. Hence, intra-cluster communication failures have minimal impact
among them. Hence, intra-cluster communication failures have minimal impact
on data availability. To solve complex data analysis problems, the different
node types come together to form a fully working system. The name Druid comes
from the Druid class in many role-playing games: it is a shape-shifter, capable
@ -231,10 +231,10 @@ On a periodic basis, each real-time node will schedule a background task that
searches for all locally persisted indexes. The task merges these indexes
together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this
block of data as a "segment". During the handoff stage, a real-time node
block of data as a ``segment". During the handoff stage, a real-time node
uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite{decandia2007dynamo} or HDFS
\cite{shvachko2010hadoop}, which Druid refers to as "deep storage". The ingest,
\cite{shvachko2010hadoop}, which Druid refers to as ``deep storage". The ingest,
persist, merge, and handoff steps are fluid; there is no data loss during any
of the processes.
@ -260,7 +260,7 @@ collected for 13:00 to 14:00 and unannounces it is serving this data.
\centering
\includegraphics[width = 4.5in]{realtime_timeline}
\caption{The node starts, ingests data, persists, and periodically hands data
off. This process repeats indefinitely. The time intervals between different
off. This process repeats indefinitely. The time periods between different
real-time node operations are configurable.}
\label{fig:realtime_timeline}
\end{figure*}
@ -436,8 +436,8 @@ Rules indicate how segments should be assigned to different historical node
tiers and how many replicates of a segment should exist in each tier. Rules may
also indicate when segments should be dropped entirely from the cluster. Rules
are usually set for a period of time. For example, a user may use rules to
load the most recent one month's worth of segments into a "hot" cluster, the
most recent one year's worth of segments into a "cold" cluster, and drop any
load the most recent one month's worth of segments into a ``hot" cluster, the
most recent one year's worth of segments into a ``cold" cluster, and drop any
segments that are older.
The coordinator nodes load a set of rules from a rule table in the MySQL
@ -569,7 +569,7 @@ representations.
\subsection{Indices for Filtering Data}
In many real world OLAP workflows, queries are issued for the aggregated
results of some set of metrics where some set of dimension specifications are
met. An example query is: "How many Wikipedia edits were done by users in
met. An example query is: ``How many Wikipedia edits were done by users in
San Francisco who are also male?". This query is filtering the Wikipedia data
set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension
values. In many real world data sets, dimension columns contain strings and
@ -609,12 +609,11 @@ used in search engines. Bitmap indices for OLAP workloads is described in
detail in \cite{o1997improved}. Bitmap compression algorithms are a
well-defined area of research \cite{antoshenkov1995byte, wu2006optimizing,
van2011memory} and often utilize run-length encoding. Druid opted to use the
Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by
reducing compressed bitmap size by up to 50\%. Figure~\ref{fig:concise_plot}
Concise algorithm \cite{colantonio2010concise}. Figure~\ref{fig:concise_plot}
illustrates the number of bytes using Concise compression versus using an
integer array. The results were generated on a \texttt{cc2.8xlarge} system with a single
thread, 2G heap, 512m young gen, and a forced GC between each run. The data set
is a single days worth of data collected from the Twitter garden hose
integer array. The results were generated on a \texttt{cc2.8xlarge} system with
a single thread, 2G heap, 512m young gen, and a forced GC between each run. The
data set is a single days worth of data collected from the Twitter garden hose
\cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
dimensions of varying cardinality. As an additional comparison, we also
resorted the data set rows to maximize compression.
@ -680,8 +679,8 @@ A sample count query over a week of data is as follows:
}
\end{verbatim}}
The query shown above will return a count of the number of rows in the Wikipedia datasource
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is
equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
{\scriptsize\begin{verbatim}
[ {
"timestamp": "2012-01-01T00:00:00.000Z",
@ -706,7 +705,7 @@ of this paper to fully describe the query API but more information can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
As of this writing, a join query for Druid is not yet implemented. This has
been a function of engineering resource allocation decisions and use case more
been a function of engineering resource allocation and use case decisions more
than a decision driven by technical merit. Indeed, Druid's storage format
would allow for the implementation of joins (there is no loss of fidelity for
columns included as dimensions) and the implementation of them has been a
@ -724,7 +723,7 @@ a shared set of keys. The primary high-level strategies for join queries the
authors are aware of are a hash-based strategy or a sorted-merge strategy. The
hash-based strategy requires that all but one data set be available as
something that looks like a hash table, a lookup operation is then performed on
this hash table for every row in the "primary" stream. The sorted-merge
this hash table for every row in the ``primary" stream. The sorted-merge
strategy assumes that each stream is sorted by the join key and thus allows for
the incremental joining of the streams. Each of these strategies, however,
requires the materialization of some number of the streams either in sorted
@ -751,8 +750,7 @@ Druid query performance can vary signficantly depending on the query
being issued. For example, sorting the values of a high cardinality dimension
based on a given metric is much more expensive than a simple count over a time
range. To showcase the average query latencies in a production Druid cluster,
we selected 8 of our most queried data sources, described in
Table~\ref{tab:datasources}.
we selected 8 of our most queried data sources, described in Table~\ref{tab:datasources}.
Approximately 30\% of the queries are standard
aggregates involving different types of metrics and filters, 60\% of queries
@ -764,7 +762,6 @@ involving all columns are very rare.
\begin{table}
\centering
\label{tab:datasources}
\begin{tabular}{| l | l | l |}
\hline
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
@ -778,14 +775,15 @@ involving all columns are very rare.
\texttt{h} & 78 & 14 \\ \hline
\end{tabular}
\caption{Characteristics of production data sources.}
\label{tab:datasources}
\end{table}
A few notes about our results:
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]
\item The results are from a "hot" tier in our production cluster. We run
\item The results are from a ``hot" tier in our production cluster. We run
several tiers of varying performance in production.
\item There is approximately 10.5TB of RAM available in the "hot" tier and
\item There is approximately 10.5TB of RAM available in the ``hot" tier and
approximately 10TB of segments loaded (including replication). Collectively,
there are about 50 billion Druid rows in this tier. Results for
every data source are not shown.
@ -798,12 +796,12 @@ threads and 672 total cores (hyperthreaded).
\end{itemize}
Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
minute is shown in Figure~\ref{fig:queries_per_min}. Across all the various
minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
data sources, average query latency is approximately 550 milliseconds, with
90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and
99\% of queries taking less than 10 seconds to complete.
99\% of queries returning in less than 10 seconds.
Occasionally we observe spikes in latency, as observed on February 19,
in which case network issues on the cache nodes were compounded by very high
in which case network issues on the Memcached instances were compounded by very high
query load on one of our largest datasources.
\begin{figure}
@ -893,11 +891,10 @@ aggregations we want to perform on those metrics. With the most basic data set
800,000 events/second/core, which is really just a measurement of how fast we can
deserialize events. Real world data sets are never this simple.
Table~\ref{tab:ingest_datasources} shows a selection of data sources and their
chracteristics.
characteristics.
\begin{table}
\centering
\label{tab:ingest_datasources}
\begin{tabular}{| l | l | l | l |}
\hline
\scriptsize\textbf{Data Source} & \scriptsize\textbf{Dimensions} & \scriptsize\textbf{Metrics} & \scriptsize\textbf{Peak events/s} \\ \hline
@ -911,6 +908,7 @@ chracteristics.
\texttt{z} & 33 & 24 & 95747.74 \\ \hline
\end{tabular}
\caption{Ingestion characteristics of various data sources.}
\label{tab:ingest_datasources}
\end{table}
We can see that, based on the descriptions in
@ -938,7 +936,7 @@ The latency measurements we presented are sufficient to address the our stated
problems of interactivity. We would prefer the variability in the latencies to
be less. It is still very possible to possible to decrease latencies by adding
additional hardware, but we have not chosen to do so because infrastructure
cost is still a consideration to us.
costs are still a consideration to us.
\section{Druid in Production}\label{sec:production}
Over the last few years, we have gained tremendous knowledge about handling
@ -976,7 +974,7 @@ historical nodes.
\paragraph{Data Center Outages}
Complete cluster failures are possible, but extremely rare. If Druid is
deployed only in a single data center, it is possible for the entire data
only deployed in a single data center, it is possible for the entire data
center to fail. In such cases, new machines need to be provisioned. As long as
deep storage is still available, cluster recovery time is network bound as
historical nodes simply need to redownload every segment from deep storage. We
@ -1076,7 +1074,7 @@ stores \cite{macnicol2004sybase}.
In this paper, we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
and is optimized for low query latencies. Druid supports streaming data
ingestion and is fault-tolerant. We discussed how Druid benchmarks and
ingestion and is fault-tolerant. We discussed Druid benchmarks and
summarized key architecture aspects such
as the storage format, query language, and general execution.

View File

@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
*/
public class CustomVersioningPolicy implements VersioningPolicy
{
private final String version;
@JsonCreator
public CustomVersioningPolicy(
@JsonProperty("version") String version
)
{
this.version = version == null ? new DateTime().toString() : version;
}
@Override
public String getVersion(Interval interval)
{
return version;
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber;
import com.google.common.collect.Lists;

View File

@ -578,9 +578,12 @@ public class RealtimePlumber implements Plumber
log.info("Starting merge and push.");
long minTimestamp = segmentGranularity.truncate(
DateTime minTimestampAsDate = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
).getMillis();
);
long minTimestamp = minTimestampAsDate.getMillis();
log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate);
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
@ -588,9 +591,13 @@ public class RealtimePlumber implements Plumber
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
} else {
log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate);
}
}
log.info("Found [%,d] sinks to persist and merge", sinksToPush.size());
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
persistAndMerge(entry.getKey(), entry.getValue());
}

View File

@ -27,7 +27,8 @@ import org.joda.time.Period;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class)
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "test", value = TestRejectionPolicyFactory.class)
})
public interface RejectionPolicyFactory
{

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Period;
/**
*/
public class TestRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(Period windowPeriod)
{
return new RejectionPolicy()
{
private final DateTime max = new DateTime(Long.MAX_VALUE);
@Override
public DateTime getCurrMaxTime()
{
return max;
}
@Override
public boolean accept(long timestamp)
{
return true;
}
};
}
}

View File

@ -25,7 +25,9 @@ import org.joda.time.Interval;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class)
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class),
@JsonSubTypes.Type(name = "custom", value = CustomVersioningPolicy.class)
})
public interface VersioningPolicy
{