Merge branch 'master' into igalDruid

This commit is contained in:
Igal Levy 2014-04-04 10:04:22 -07:00
commit 0b0b8a1d37
96 changed files with 1582 additions and 590 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.73"
echo "See also http://druid.io/docs/0.6.81"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -54,10 +54,8 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
"gran": "day"
},
"pathSpec": {
"type": "granularity",
"dataGranularity": "hour",
"inputPath": "s3n:\/\/billy-bucket\/the\/data\/is\/here",
"filePattern": ".*"
"type": "static",
"paths" : "example/path/data.gz,example/path/moredata.gz"
},
"rollupSpec": {
"aggs": [
@ -116,6 +114,20 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
There are multiple types of path specification:
##### `static`
Is a type of data loader where a static path to where the data files are located is passed.
|property|description|required?|
|--------|-----------|---------|
|paths|A String of input paths indicating where the raw data is located.|yes|
For example, using the static input paths:
```
"paths" : "s3n://billy-bucket/the/data/is/here/data.gz, s3n://billy-bucket/the/data/is/here/moredata.gz, s3n://billy-bucket/the/data/is/here/evenmoredata.gz"
```
##### `granularity`
Is a type of data loader that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase).

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.73
git checkout druid-0.6.81
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/indexer
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/worker
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -21,6 +21,10 @@ druid.storage.bucket=druid
druid.storage.baseKey=sample
```
## How do I get HDFS to work?
Make sure to include the `druid-hdfs-storage` module as one of your extensions and set `druid.storage.type=hdfs`.
## I don't see my Druid segments on my historical nodes
You can check the coordinator console located at `<COORDINATOR_IP>:<PORT>/cluster.html`. Make sure that your segments have actually loaded on [historical nodes](Historical.html). If your segments are not present, check the coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because historical nodes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example):
@ -31,7 +35,7 @@ You can check the coordinator console located at `<COORDINATOR_IP>:<PORT>/cluste
## My queries are returning empty results
You can check `<BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE>` for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists.
You can check `<BROKER_IP>:<PORT>/druid/v2/datasources/<YOUR_DATASOURCE>?interval=0/3000` for the dimensions and metrics that have been created for your datasource. Make sure that the name of the aggregators you use in your query match one of these metrics. Also make sure that the query interval you specify match a valid time range where data exists. Note: the broker endpoint will only return valid results on historical segments.
## More information

View File

@ -7,7 +7,7 @@ The plumber handles generated segments both while they are being generated and w
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|Specifies the type of plumber. Each value will have its own configuration schema. Plumbers packaged with Druid are described below.|yes|
|type|String|Specifies the type of plumber. Each value will have its own configuration schema. Plumbers packaged with Druid are described below. The default type is "realtime".|yes|
The following can be configured on the plumber:
@ -16,12 +16,11 @@ The following can be configured on the plumber:
* `maxPendingPersists` is how many persists a plumber can do concurrently without starting to block.
* `segmentGranularity` specifies the granularity of the segment, or the amount of time a segment will represent.
* `rejectionPolicy` controls how data sets the data acceptance policy for creating and handing off segments. The following policies are available:
* `serverTime` &ndash; The default policy, it is optimal for current data that is generated and ingested in real time. Uses `windowPeriod` to accept only those events that are inside the window looking forward and back.
* `serverTime` &ndash; The recommended policy for "current time" data, it is optimal for current data that is generated and ingested in real time. Uses `windowPeriod` to accept only those events that are inside the window looking forward and back.
* `messageTime` &ndash; Can be used for non-"current time" as long as that data is relatively in sequence. Events are rejected if they are less than `windowPeriod` from the event with the latest timestamp. Hand off only occurs if an event is seen after the segmentGranularity and `windowPeriod`.
* `none` &ndash; Never hands off data unless shutdown() is called on the configured firehose.
* `test` &ndash; Useful for testing that handoff is working, *not useful in terms of data integrity*. It uses the sum of `segmentGranularity` plus `windowPeriod` as a window.
Available Plumbers
------------------

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.81"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -118,10 +118,10 @@ The Plumber handles generated segments both while they are being generated and w
* `windowPeriod` is the amount of lag time to allow events. The example configures a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.
* `segmentGranularity` specifies the granularity of the segment, or the amount of time a segment will represent.
* `basePersistDirectory` is the directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.
* `rejectionPolicy` determines what events are rejected upon ingestion.
See [Plumber](Plumber.html) for a fuller discussion of Plumber configuration.
Constraints
-----------

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.73
cd druid-services-0.6.81
```
You should see a bunch of files:

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.73
cd druid-services-0.6.81
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.73-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.81-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.81"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.73","io.druid.extensions:druid-kafka-seven:0.6.73","io.druid.extensions:druid-rabbitmq:0.6.73"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.81","io.druid.extensions:druid-kafka-seven:0.6.81","io.druid.extensions:druid-rabbitmq:0.6.81"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

10
pom.xml
View File

@ -23,14 +23,14 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.73-SNAPSHOT</tag>
<tag>druid-0.6.81-SNAPSHOT</tag>
</scm>
<prerequisites>
@ -313,17 +313,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>8.1.11.v20130520</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>8.1.11.v20130520</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>8.1.11.v20130520</version>
<version>9.1.3.v20140225</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -25,6 +25,8 @@ import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.guava.YieldingSequenceBase;
import java.io.Closeable;
/**
*/
public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
@ -43,6 +45,7 @@ public class ReferenceCountingSequence<T> extends YieldingSequenceBase<T>
OutType initValue, YieldingAccumulator<OutType, T> accumulator
)
{
return new ResourceClosingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), segment.increment());
final Closeable closeable = segment.increment();
return new ResourceClosingYielder<OutType>(baseSequence.toYielder(initValue, accumulator), closeable);
}
}

View File

@ -37,11 +37,11 @@ public class HyperLogLogCollectorTest
{
private final HashFunction fn = Hashing.murmur3_128();
private final Random random = new Random();
@Test
public void testFolding() throws Exception
{
final Random random = new Random(0);
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
@ -154,6 +154,7 @@ public class HyperLogLogCollectorTest
@Test
public void testFoldingByteBuffers() throws Exception
{
final Random random = new Random(0);
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
@ -186,6 +187,7 @@ public class HyperLogLogCollectorTest
@Test
public void testFoldingReadOnlyByteBuffers() throws Exception
{
final Random random = new Random(0);
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
@ -221,6 +223,7 @@ public class HyperLogLogCollectorTest
@Test
public void testFoldingReadOnlyByteBuffersWithArbitraryPosition() throws Exception
{
final Random random = new Random(0);
final int[] numValsToCheck = {10, 20, 50, 100, 1000, 2000};
for (int numThings : numValsToCheck) {
HyperLogLogCollector allCombined = HyperLogLogCollector.makeLatestCollector();
@ -482,9 +485,11 @@ public class HyperLogLogCollectorTest
return retVal;
}
//@Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
@Ignore @Test // This test can help when finding potential combinations that are weird, but it's non-deterministic
public void testFoldingwithDifferentOffsets() throws Exception
{
// final Random random = new Random(37); // this seed will cause this test to fail because of slightly larger errors
final Random random = new Random(0);
for (int j = 0; j < 10; j++) {
HyperLogLogCollector smallVals = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector bigVals = HyperLogLogCollector.makeLatestCollector();
@ -511,9 +516,10 @@ public class HyperLogLogCollectorTest
}
}
//@Test
@Ignore @Test
public void testFoldingwithDifferentOffsets2() throws Exception
{
final Random random = new Random(0);
MessageDigest md = MessageDigest.getInstance("SHA-1");
for (int j = 0; j < 1; j++) {
@ -632,6 +638,7 @@ public class HyperLogLogCollectorTest
@Test
public void testSparseEstimation() throws Exception
{
final Random random = new Random(0);
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
for (int i = 0; i < 100; ++i) {
@ -742,6 +749,54 @@ public class HyperLogLogCollectorTest
}
}
@Test
public void testMaxOverflow() {
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.add((short)23, (byte)16);
Assert.assertEquals(23, collector.getMaxOverflowRegister());
Assert.assertEquals(16, collector.getMaxOverflowValue());
Assert.assertEquals(0, collector.getRegisterOffset());
Assert.assertEquals(0, collector.getNumNonZeroRegisters());
collector.add((short)56, (byte)17);
Assert.assertEquals(56, collector.getMaxOverflowRegister());
Assert.assertEquals(17, collector.getMaxOverflowValue());
collector.add((short)43, (byte)16);
Assert.assertEquals(56, collector.getMaxOverflowRegister());
Assert.assertEquals(17, collector.getMaxOverflowValue());
Assert.assertEquals(0, collector.getRegisterOffset());
Assert.assertEquals(0, collector.getNumNonZeroRegisters());
}
@Test
public void testMergeMaxOverflow() {
// no offset
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
collector.add((short)23, (byte)16);
HyperLogLogCollector other = HyperLogLogCollector.makeLatestCollector();
collector.add((short)56, (byte)17);
collector.fold(other);
Assert.assertEquals(56, collector.getMaxOverflowRegister());
Assert.assertEquals(17, collector.getMaxOverflowValue());
// different offsets
// fill up all the buckets so we reach a registerOffset of 49
collector = HyperLogLogCollector.makeLatestCollector();
fillBuckets(collector, (byte) 0, (byte) 49);
collector.add((short)23, (byte)65);
other = HyperLogLogCollector.makeLatestCollector();
fillBuckets(other, (byte) 0, (byte) 43);
other.add((short)47, (byte)67);
collector.fold(other);
Assert.assertEquals(47, collector.getMaxOverflowRegister());
Assert.assertEquals(67, collector.getMaxOverflowValue());
}
private static void fillBuckets(HyperLogLogCollector collector, byte startOffset, byte endOffset)
{
@ -756,7 +811,7 @@ public class HyperLogLogCollectorTest
}
// Provides a nice printout of error rates as a function of cardinality
//@Test
@Ignore @Test
public void showErrorRate() throws Exception
{
HashFunction fn = Hashing.murmur3_128();

View File

@ -1,10 +1,10 @@
all : druid
all : druid sigmod zip
druid : druid.pdf
sigmod : sgmd0658-yang.pdf
sigmod : modii658-yang.pdf
zip : sgmd0658-yang.zip
zip : modii658-yang.zip
%.zip : %.pdf
@rm -f dummy.ps

Binary file not shown.

View File

@ -1,4 +1,5 @@
\documentclass{sig-alternate-2013}
\pdfminorversion=4
\newfont{\mycrnotice}{ptmr8t at 7pt}
\newfont{\myconfname}{ptmri8t at 7pt}
@ -60,28 +61,28 @@
\maketitle
\begin{abstract}
\begin{abstract}
Druid is an open source\footnote{\href{http://druid.io/}{http://druid.io/} \href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
data store designed for real-time exploratory analytics on large data sets.
The system combines a column-oriented storage layout, a distributed,
shared-nothing architecture, and an advanced indexing structure to allow for
the arbitrary exploration of billion-row tables with sub-second latencies. In
this paper, we describe Druid's architecture, and detail how it supports fast
aggregations, flexible filters, and low latency data ingestion.
aggregations, flexible filters, and low latency data ingestion.
\end{abstract}
% A category with the (minimum) three required fields
\category{H.2.4}{Database Management}{Systems}[Distributed databases]
% \category{D.2.8}{Software Engineering}{Metrics}[complexity measures, performance measures]
\keywords{distributed; real-time; fault-tolerant; analytics; column-oriented; OLAP}
\keywords{distributed; real-time; fault-tolerant; highly available; open source; analytics; column-oriented; OLAP}
\section{Introduction}
\section{Introduction}
In recent years, the proliferation of internet technology has
created a surge in machine-generated events. Individually, these
events contain minimal useful information and are of low value. Given the
created a surge in machine-generated events. Individually, these
events contain minimal useful information and are of low value. Given the
time and resources required to extract meaning from large collections of
events, many companies were willing to discard this data instead. Although
events, many companies were willing to discard this data instead. Although
infrastructure has been built to handle event-based data (e.g. IBM's
Netezza\cite{singh2011introduction}, HP's Vertica\cite{bear2012vertica}, and EMC's
Greenplum\cite{miner2012unified}), they are largely sold at high price points
@ -89,36 +90,36 @@ and are only targeted towards those companies who can afford the offering.
A few years ago, Google introduced MapReduce \cite{dean2008mapreduce} as their
mechanism of leveraging commodity hardware to index the internet and analyze
logs. The Hadoop \cite{shvachko2010hadoop} project soon followed and was
logs. The Hadoop \cite{shvachko2010hadoop} project soon followed and was
largely patterned after the insights that came out of the original MapReduce
paper. Hadoop is currently deployed in many organizations to store and analyze
large amounts of log data. Hadoop has contributed much to helping companies
large amounts of log data. Hadoop has contributed much to helping companies
convert their low-value event streams into high-value aggregates for a variety
of applications such as business intelligence and A-B testing.
As with a lot of great systems, Hadoop has opened our eyes to a new space of
problems. Specifically, Hadoop excels at storing and providing access to large
As with many great systems, Hadoop has opened our eyes to a new space of
problems. Specifically, Hadoop excels at storing and providing access to large
amounts of data, however, it does not make any performance guarantees around
how quickly that data can be accessed. Furthermore, although Hadoop is a
how quickly that data can be accessed. Furthermore, although Hadoop is a
highly available system, performance degrades under heavy concurrent load.
Lastly, while Hadoop works well for storing data, it is not optimized for
ingesting data and making that data immediately readable.
Early on in the development of the Metamarkets product, we ran into each of
these issues and came to the realization that Hadoop is a great back-office,
batch processing, and data warehousing system. However, as a company that has
batch processing, and data warehousing system. However, as a company that has
product-level guarantees around query performance and data availability in a
highly concurrent environment (1000+ users), Hadoop wasn't going to meet our
needs. We explored different solutions in the space, and after
needs. We explored different solutions in the space, and after
trying both Relational Database Management Systems and NoSQL architectures, we
came to the conclusion that there was nothing in the open source world that
could be fully leveraged for our requirements. We ended up creating Druid, an
open-source, distributed, column-oriented, real-time analytical data store. In
open source, distributed, column-oriented, real-time analytical data store. In
many ways, Druid shares similarities with other OLAP systems
\cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
interactive query systems \cite{melnik2010dremel}, main-memory databases
\cite{farber2012sap}, as well as widely known distributed data stores
\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The
\cite{chang2008bigtable, decandia2007dynamo, lakshman2010cassandra}. The
distribution and query model also borrow ideas from current generation search
infrastructure \cite{linkedin2013senseidb, apache2013solr,
banon2013elasticsearch}.
@ -130,10 +131,10 @@ potential method of solving it. Druid is deployed in production at several
technology
companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}.
The structure of the paper is as follows: we first describe the problem in
Section \ref{sec:problem-definition}. Next, we detail system architecture from
Section \ref{sec:problem-definition}. Next, we detail system architecture from
the point of view of how data flows through the system in Section
\ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the
\ref{sec:architecture}. We then discuss how and why data gets converted into a
binary format in Section \ref{sec:storage-format}. We briefly describe the
query API in Section \ref{sec:query-api} and present performance results
in Section \ref{sec:benchmarks}. Lastly, we leave off with our lessons from
running Druid in production in Section \ref{sec:production}, and related work
@ -161,7 +162,7 @@ Druid was originally designed to solve problems around ingesting and exploring
large quantities of transactional events (log data). This form of timeseries
data is commonly found in OLAP workflows and the nature of the data tends to be
very append heavy. For example, consider the data shown in
Table~\ref{tab:sample_data}. Table~\ref{tab:sample_data} contains data for
Table~\ref{tab:sample_data}. Table~\ref{tab:sample_data} contains data for
edits that have occurred on Wikipedia. Each time a user edits a page in
Wikipedia, an event is generated that contains metadata about the edit. This
metadata is comprised of 3 distinct components. First, there is a timestamp
@ -170,7 +171,7 @@ columns indicating various attributes about the edit such as the page that was
edited, the user who made the edit, and the location of the user. Finally,
there are a set of metric columns that contain values (usually numeric) that
can be aggregated, such as the number of characters added or removed in an
edit.
edit.
Our goal is to rapidly compute drill-downs and aggregates over this data. We
want to answer questions like “How many edits were made on the page Justin
@ -184,45 +185,45 @@ Relational Database Management Systems (RDBMS) and NoSQL key/value stores were
unable to provide a low latency data ingestion and query platform for
interactive applications \cite{tschetter2011druid}. In the early days of
Metamarkets, we were focused on building a hosted dashboard that would allow
users to arbitrarily explore and visualize event streams. The data store
users to arbitrarily explore and visualize event streams. The data store
powering the dashboard needed to return queries fast enough that the data
visualizations built on top of it could provide users with an interactive
experience.
experience.
In addition to the query latency needs, the system had to be multi-tenant and
highly available. The Metamarkets product is used in a highly concurrent
environment. Downtime is costly and many businesses cannot afford to wait if a
system is unavailable in the face of software upgrades or network failure.
Downtime for startups, who often lack proper internal operations management, can
determine business success or failure.
determine business success or failure.
Finally, another key problem that Metamarkets faced in its early days was to
Finally, another challenge that Metamarkets faced in its early days was to
allow users and alerting systems to be able to make business decisions in
``real-time". The time from when an event is created to when that
event is queryable determines how fast users and systems are able to react to
potentially catastrophic occurrences in their systems. Popular open source data
warehousing systems such as Hadoop were unable to provide the sub-second data ingestion
latencies we required.
``real-time". The time from when an event is created to when that event is
queryable determines how fast interested parties are able to react to
potentially catastrophic situations in their systems. Popular open source data
warehousing systems such as Hadoop were unable to provide the sub-second data
ingestion latencies we required.
The problems of data exploration, ingestion, and availability span multiple
industries. Since Druid was open sourced in October 2012, it been deployed as a
video, network monitoring, operations monitoring, and online advertising
analytics platform in multiple companies.
analytics platform at multiple companies.
\section{Architecture}
\label{sec:architecture}
A Druid cluster consists of different types of nodes and each node type is
designed to perform a specific set of things. We believe this design separates
concerns and simplifies the complexity of the system. The different node types
operate fairly independent of each other and there is minimal interaction
among them. Hence, intra-cluster communication failures have minimal impact
on data availability.
concerns and simplifies the complexity of the overall system. The different
node types operate fairly independent of each other and there is minimal
interaction among them. Hence, intra-cluster communication failures have
minimal impact on data availability.
To solve complex data analysis problems, the different
node types come together to form a fully working system. The composition of and
flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. The name Druid comes from the Druid class in many role-playing games: it is a
shape-shifter, capable of taking on many different forms to fulfill various
different roles in a group.
To solve complex data analysis problems, the different node types come together
to form a fully working system. The name Druid comes from the Druid class in
many role-playing games: it is a shape-shifter, capable of taking on many
different forms to fulfill various different roles in a group. The composition
of and flow of data in a Druid cluster are shown in Figure~\ref{fig:cluster}.
\begin{figure*}
\centering
@ -238,14 +239,14 @@ streams. Events indexed via these nodes are immediately available for querying.
The nodes are only concerned with events for some small time range and
periodically hand off immutable batches of events they have collected over this
small time range to other nodes in the Druid cluster that are specialized in
dealing with batches of immutable events. Real-time nodes leverage Zookeeper
dealing with batches of immutable events. Real-time nodes leverage ZooKeeper
\cite{hunt2010zookeeper} for coordination with the rest of the Druid cluster.
The nodes announce their online state and the data they are serving in
Zookeeper.
The nodes announce their online state and the data they serve in
ZooKeeper.
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
indexes are also directly queryable. Druid behaves as a row store
These indexes are incrementally populated as events are ingested and the
indexes are also directly queryable. Druid behaves as a row store
for queries on events that exist in this JVM heap-based buffer. To avoid heap
overflow problems, real-time nodes persist their in-memory indexes to disk
either periodically or after some maximum row limit is reached. This persist
@ -255,10 +256,10 @@ index is immutable and real-time nodes load persisted indexes into off-heap
memory such that they can still be queried. This process is described in detail
in \cite{o1996log} and is illustrated in Figure~\ref{fig:realtime_flow}.
\begin{figure}
\centering
\begin{figure}
\centering
\includegraphics[width = 2.6in]{realtime_flow}
\caption{Real-time nodes buffer events to an in-memory index, which is
\caption{Real-time nodes buffer events to an in-memory index, which is
regularly persisted to disk. On a periodic basis, persisted indexes are then merged
together before getting handed off.
Queries will hit both the in-memory and persisted indexes.
@ -269,7 +270,7 @@ Queries will hit both the in-memory and persisted indexes.
On a periodic basis, each real-time node will schedule a background task that
searches for all locally persisted indexes. The task merges these indexes
together and builds an immutable block of data that contains all the events
that have ingested by a real-time node for some span of time. We refer to this
that have been ingested by a real-time node for some span of time. We refer to this
block of data as a ``segment". During the handoff stage, a real-time node
uploads this segment to a permanent backup storage, typically a distributed
file system such as S3 \cite{decandia2007dynamo} or HDFS
@ -280,20 +281,20 @@ of the processes.
Figure~\ref{fig:realtime_timeline} illustrates the operations of a real-time
node. The node starts at 13:37 and will only accept events for the current hour
or the next hour. When events are ingested, the node announces that it is
serving a segment of data for an interval from 13:00 to 14:00. Every 10
serving a segment of data for an interval from 13:00 to 14:00. Every 10
minutes (the persist period is configurable), the node will flush and persist
its in-memory buffer to disk. Near the end of the hour, the node will likely
its in-memory buffer to disk. Near the end of the hour, the node will likely
see events for 14:00 to 15:00. When this occurs, the node prepares to serve
data for the next hour and creates a new in-memory index. The node then
announces that it is also serving a segment from 14:00 to 15:00. The node does
announces that it is also serving a segment from 14:00 to 15:00. The node does
not immediately merge persisted indexes from 13:00 to 14:00, instead it waits
for a configurable window period for straggling events from 13:00 to 14:00 to
arrive. This window period minimizes the risk of data loss from delays in event
delivery. At the end of the window period, the node merges all persisted
indexes from 13:00 to 14:00 into a single immutable segment and hands the
segment off. Once this segment is loaded and queryable somewhere else in the
segment off. Once this segment is loaded and queryable somewhere else in the
Druid cluster, the real-time node flushes all information about the data it
collected for 13:00 to 14:00 and unannounces it is serving this data.
collected for 13:00 to 14:00 and unannounces it is serving this data.
\begin{figure*}
\centering
@ -306,7 +307,7 @@ real-time node operations are configurable.}
\subsubsection{Availability and Scalability}
Real-time nodes are a consumer of data and require a corresponding producer to
provide the data stream. Commonly, for data durability purposes, a message
provide the data stream. Commonly, for data durability purposes, a message
bus such as Kafka \cite{kreps2011kafka} sits between the producer and the
real-time node as shown in Figure~\ref{fig:realtime_pipeline}. Real-time nodes
ingest data by reading events from the message bus. The time from event
@ -321,7 +322,7 @@ milliseconds.
\end{figure}
The purpose of the message bus in Figure~\ref{fig:realtime_pipeline} is
two-fold. First, the message bus acts as a buffer for incoming events. A
two-fold. First, the message bus acts as a buffer for incoming events. A
message bus such as Kafka maintains positional offsets indicating how far a
consumer (a real-time node) has read in an event stream. Consumers can
programmatically update these offsets. Real-time nodes update this offset each
@ -337,7 +338,7 @@ multiple real-time nodes can read events. Multiple real-time nodes can ingest
the same set of events from the bus, creating a replication of events. In a
scenario where a node completely fails and loses disk, replicated streams
ensure that no data is lost. A single ingestion endpoint also allows for data
streams for be partitioned such that multiple real-time nodes each ingest a
streams to be partitioned such that multiple real-time nodes each ingest a
portion of a stream. This allows additional real-time nodes to be seamlessly
added. In practice, this model has allowed one of the largest production Druid
clusters to be able to consume raw data at approximately 500 MB/s (150,000
@ -347,22 +348,22 @@ events/s or 2 TB/hour).
Historical nodes encapsulate the functionality to load and serve the immutable
blocks of data (segments) created by real-time nodes. In many real-world
workflows, most of the data loaded in a Druid cluster is immutable and hence,
historical nodes are typically the main workers of a Druid cluster. Historical
historical nodes are typically the main workers of a Druid cluster. Historical
nodes follow a shared-nothing architecture and there is no single point of
contention among the nodes. The nodes have no knowledge of one another and are
operationally simple; they only know how to load, drop, and serve immutable
segments.
segments.
Similar to real-time nodes, historical nodes announce their online state and
the data they are serving in Zookeeper. Instructions to load and drop segments
are sent over Zookeeper and contain information about where the segment is
located in deep storage and how to decompress and process the segment. Before
the data they are serving in ZooKeeper. Instructions to load and drop segments
are sent over ZooKeeper and contain information about where the segment is
located in deep storage and how to decompress and process the segment. Before
a historical node downloads a particular segment from deep storage, it first
checks a local cache that maintains information about what segments already
exist on the node. If information about a segment is not present in the cache,
exist on the node. If information about a segment is not present in the cache,
the historical node will proceed to download the segment from deep storage.
This process is shown in Figure~\ref{fig:historical_download}. Once processing
is complete, the segment is announced in Zookeeper. At this point, the segment
is complete, the segment is announced in ZooKeeper. At this point, the segment
is queryable. The local cache also allows for historical nodes to be quickly
updated and restarted. On startup, the node examines its cache and immediately
serves whatever data it finds.
@ -378,7 +379,7 @@ Historical nodes can support read consistency because they only deal with
immutable data. Immutable data blocks also enable a simple parallelization
model: historical nodes can concurrently scan and aggregate immutable blocks
without blocking.
\subsubsection{Tiers}
\label{sec:tiers}
Historical nodes can be grouped in different tiers, where all nodes in a
@ -393,20 +394,20 @@ can also be created with much less powerful backing hardware. The
“cold” cluster would only contain less frequently accessed segments.
\subsubsection{Availability}
Historical nodes depend on Zookeeper for segment load and unload instructions.
If Zookeeper becomes unavailable, historical nodes are no longer able to serve
new data and drop outdated data, however, because the queries are served over
HTTP, historical nodes are still be able to respond to query requests for
the data they are currently serving. This means that Zookeeper outages do not
impact current data availability on historical nodes.
Historical nodes depend on ZooKeeper for segment load and unload instructions.
Should ZooKeeper become unavailable, historical nodes are no longer able to serve
new data or drop outdated data, however, because the queries are served over
HTTP, historical nodes are still able to respond to query requests for
the data they are currently serving. This means that ZooKeeper outages do not
impact current data availability on historical nodes.
\subsection{Broker Nodes}
Broker nodes act as query routers to historical and real-time nodes. Broker
nodes understand the metadata published in Zookeeper about what segments are
Broker nodes act as query routers to historical and real-time nodes. They
understand the metadata published in ZooKeeper about what segments are
queryable and where those segments are located. Broker nodes route incoming queries
such that the queries hit the right historical or real-time nodes. Broker nodes
also merge partial results from historical and real-time nodes before returning
a final consolidated result to the caller.
a final consolidated result to the caller.
\subsubsection{Caching}
\label{sec:caching}
@ -417,12 +418,12 @@ distributed key/value store such as Memcached
first maps the query to a set of segments. Results for certain segments may
already exist in the cache and there is no need to recompute them. For any
results that do not exist in the cache, the broker node will forward the query
to the correct historical and real-time nodes. Once historical nodes return
to the correct historical and real-time nodes. Once historical nodes return
their results, the broker will cache these results on a per segment basis for
future use. This process is illustrated in Figure~\ref{fig:caching}. Real-time
data is never cached and hence requests for real-time data will always be
forwarded to real-time nodes. Real-time data is perpetually changing and
caching the results would be unreliable.
forwarded to real-time nodes. Real-time data is perpetually changing and
caching the results is unreliable.
\begin{figure*}
\centering
@ -436,20 +437,20 @@ that all historical nodes fail, it is still possible to query results if those
results already exist in the cache.
\subsubsection{Availability}
In the event of a total Zookeeper outage, data is still queryable. If broker
nodes are unable to communicate to Zookeeper, they use their last known view of
In the event of a total ZooKeeper outage, data is still queryable. If broker
nodes are unable to communicate to ZooKeeper, they use their last known view of
the cluster and continue to forward queries to real-time and historical nodes.
Broker nodes make the assumption that the structure of the cluster is the same
as it was before the outage. In practice, this availability model has allowed
our Druid cluster to continue serving queries for a significant period of time while we
diagnosed Zookeeper outages.
diagnosed ZooKeeper outages.
\subsection{Coordinator Nodes}
Druid coordinator nodes are primarily in charge of data management and
distribution on historical nodes. The coordinator nodes tell historical nodes
to load new data, drop outdated data, replicate data, and move data to load
balance. Druid uses a multi-version concurrency control swapping protocol for
managing immutable segments in order to maintain stable views. If any
managing immutable segments in order to maintain stable views. If any
immutable segment contains data that is wholly obsoleted by newer segments, the
outdated segment is dropped from the cluster. Coordinator nodes undergo a
leader-election process that determines a single node that runs the coordinator
@ -458,7 +459,7 @@ functionality. The remaining coordinator nodes act as redundant backups.
A coordinator node runs periodically to determine the current state of the
cluster. It makes decisions by comparing the expected state of the cluster with
the actual state of the cluster at the time of the run. As with all Druid
nodes, coordinator nodes maintain a Zookeeper connection for current cluster
nodes, coordinator nodes maintain a ZooKeeper connection for current cluster
information. Coordinator nodes also maintain a connection to a MySQL
database that contains additional operational parameters and configurations.
One of the key pieces of information located in the MySQL database is a table
@ -472,7 +473,7 @@ Rules govern how historical segments are loaded and dropped from the cluster.
Rules indicate how segments should be assigned to different historical node
tiers and how many replicates of a segment should exist in each tier. Rules may
also indicate when segments should be dropped entirely from the cluster. Rules
are usually set for a period of time. For example, a user may use rules to
are usually set for a period of time. For example, a user may use rules to
load the most recent one month's worth of segments into a ``hot" cluster, the
most recent one year's worth of segments into a ``cold" cluster, and drop any
segments that are older.
@ -488,19 +489,19 @@ of segments. Since each historical node has limited resources, segments must be
distributed among the cluster to ensure that the cluster load is not too
imbalanced. Determining optimal load distribution requires some knowledge about
query patterns and speeds. Typically, queries cover recent segments spanning
contiguous time intervals for a single data source. On average, queries that
contiguous time intervals for a single data source. On average, queries that
access smaller segments are faster.
These query patterns suggest replicating recent historical segments at a higher
rate, spreading out large segments that are close in time to different
historical nodes, and co-locating segments from different data sources. To
historical nodes, and co-locating segments from different data sources. To
optimally distribute and balance segments among the cluster, we developed a
cost-based optimization procedure that takes into account the segment data
source, recency, and size. The exact details of the algorithm are beyond the
scope of this paper and may be discussed in future literature.
\subsubsection{Replication}
Coordinator nodes may tell different historical nodes to load copies of the
Coordinator nodes may tell different historical nodes to load a copy of the
same segment. The number of replicates in each tier of the historical compute
cluster is fully configurable. Setups that require high levels of fault
tolerance can be configured to have a high number of replicas. Replicated
@ -513,17 +514,17 @@ cluster. Over the last two years, we have never taken downtime in our Druid
cluster for software upgrades.
\subsubsection{Availability}
Druid coordinator nodes have two external dependencies: Zookeeper and MySQL.
Coordinator nodes rely on Zookeeper to determine what historical nodes already
exist in the cluster. If Zookeeper becomes unavailable, the coordinator will no
Druid coordinator nodes have ZooKeeper and MySQL as external dependencies.
Coordinator nodes rely on ZooKeeper to determine what historical nodes already
exist in the cluster. If ZooKeeper becomes unavailable, the coordinator will no
longer be able to send instructions to assign, balance, and drop segments.
However, these operations do not affect data availability at all.
The design principle for responding to MySQL and Zookeeper failures is the
The design principle for responding to MySQL and ZooKeeper failures is the
same: if an external dependency responsible for coordination fails, the cluster
maintains the status quo. Druid uses MySQL to store operational management
information and segment metadata information about what segments should exist
in the cluster. If MySQL goes down, this information becomes unavailable to
in the cluster. If MySQL goes down, this information becomes unavailable to
coordinator nodes. However, this does not mean data itself is unavailable. If
coordinator nodes cannot communicate to MySQL, they will cease to assign new
segments and drop outdated ones. Broker, historical, and real-time nodes are still
@ -534,10 +535,10 @@ queryable during MySQL outages.
Data tables in Druid (called \emph{data sources}) are collections of
timestamped events and partitioned into a set of segments, where each segment
is typically 5--10 million rows. Formally, we define a segment as a collection
of rows of data that span some period in time. Segments represent the
of rows of data that span some period of time. Segments represent the
fundamental storage unit in Druid and replication and distribution are done at
a segment level.
Druid always requires a timestamp column as a method of simplifying data
distribution policies, data retention policies, and first-level query pruning.
Druid partitions its data sources into well-defined time intervals, typically
@ -549,16 +550,16 @@ with timestamps spread over a day is better partitioned by hour.
Segments are uniquely identified by a data source identifer, the time interval
of the data, and a version string that increases whenever a new segment is
created. The version string indicates the freshness of segment data; segments
created. The version string indicates the freshness of segment data; segments
with later versions have newer views of data (over some time range) than
segments with older versions. This segment metadata is used by the system for
segments with older versions. This segment metadata is used by the system for
concurrency control; read operations always access data in a particular time
range from the segments with the latest version identifiers for that time
range.
Druid segments are stored in a column orientation. Given that Druid is best
used for aggregating event streams (all data going into Druid must have a
timestamp), the advantages storing aggregate information as columns rather than
timestamp), the advantages of storing aggregate information as columns rather than
rows are well documented \cite{abadi2008column}. Column storage allows for more
efficient CPU usage as only what is needed is actually loaded and scanned. In a
row oriented data store, all columns associated with a row must be scanned as
@ -573,7 +574,7 @@ contain strings. Storing strings directly is unnecessarily costly and string
columns can be dictionary encoded instead. Dictionary encoding is a common
method to compress data and has been used in other data stores such as
PowerDrill \cite{hall2012processing}. In the example in
Table~\ref{tab:sample_data}, we can map each page to an unique integer
Table~\ref{tab:sample_data}, we can map each page to a unique integer
identifier.
{\small\begin{verbatim}
Justin Bieber -> 0
@ -607,7 +608,7 @@ representations.
In many real world OLAP workflows, queries are issued for the aggregated
results of some set of metrics where some set of dimension specifications are
met. An example query is: ``How many Wikipedia edits were done by users in
San Francisco who are also male?". This query is filtering the Wikipedia data
San Francisco who are also male?" This query is filtering the Wikipedia data
set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension
values. In many real world data sets, dimension columns contain strings and
metric columns contain numeric values. Druid creates additional lookup
@ -657,9 +658,9 @@ resorted the data set rows to maximize compression.
In the unsorted case, the total Concise size was 53,451,144 bytes and the total
integer array size was 127,248,520 bytes. Overall, Concise compressed sets are
about 42\% smaller than integer arrays. In the sorted case, the total Concise
about 42\% smaller than integer arrays. In the sorted case, the total Concise
compressed size was 43,832,884 bytes and the total integer array size was
127,248,520 bytes. What is interesting to note is that after sorting, global
127,248,520 bytes. What is interesting to note is that after sorting, global
compression only increased minimally.
\subsection{Storage Engine}
@ -673,7 +674,7 @@ memory-mapped storage engine but could be a better alternative if performance
is critical. By default, a memory-mapped storage engine is used.
When using a memory-mapped storage engine, Druid relies on the operating system
to page segments in and out of memory. Given that segments can only be scanned
to page segments in and out of memory. Given that segments can only be scanned
if they are loaded in memory, a memory-mapped storage engine allows recent
segments to retain in memory whereas segments that are never queried are paged
out. The main drawback with using the memory-mapped storage engine is when a
@ -694,8 +695,8 @@ JSON object containing the aggregated metrics over the time period.
Most query types will also support a filter set. A filter set is a Boolean
expression of dimension name and value pairs. Any number and combination of
dimensions and values may be specified. When a filter set is provided, only
the subset of the data that pertains to the filter set will be scanned. The
dimensions and values may be specified. When a filter set is provided, only
the subset of the data that pertains to the filter set will be scanned. The
ability to handle complex nested filter sets is what enables Druid to drill
into data at any depth.
@ -716,7 +717,7 @@ A sample count query over a week of data is as follows:
}
\end{verbatim}}
The query shown above will return a count of the number of rows in the Wikipedia data source
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is
equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
{\scriptsize\begin{verbatim}
[ {
@ -734,19 +735,19 @@ equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array
} ]
\end{verbatim}}
Druid supports many types of aggregations including double sums, long sums,
Druid supports many types of aggregations including sums on floating-point and integer types,
minimums, maximums, and complex aggregations such as cardinality estimation and
approximate quantile estimation. The results of aggregations can be combined
approximate quantile estimation. The results of aggregations can be combined
in mathematical expressions to form other aggregations. It is beyond the scope
of this paper to fully describe the query API but more information can be found
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
As of this writing, a join query for Druid is not yet implemented. This has
As of this writing, a join query for Druid is not yet implemented. This has
been a function of engineering resource allocation and use case decisions more
than a decision driven by technical merit. Indeed, Druid's storage format
than a decision driven by technical merit. Indeed, Druid's storage format
would allow for the implementation of joins (there is no loss of fidelity for
columns included as dimensions) and the implementation of them has been a
conversation that we have every few months. To date, we have made the choice
conversation that we have every few months. To date, we have made the choice
that the implementation cost is not worth the investment for our organization.
The reasons for this decision are generally two-fold.
@ -756,30 +757,29 @@ The reasons for this decision are generally two-fold.
\end{enumerate}
A join query is essentially the merging of two or more streams of data based on
a shared set of keys. The primary high-level strategies for join queries the
authors are aware of are a hash-based strategy or a sorted-merge strategy. The
a shared set of keys. The primary high-level strategies for join queries we
are aware of are a hash-based strategy or a sorted-merge strategy. The
hash-based strategy requires that all but one data set be available as
something that looks like a hash table, a lookup operation is then performed on
this hash table for every row in the ``primary" stream. The sorted-merge
this hash table for every row in the ``primary" stream. The sorted-merge
strategy assumes that each stream is sorted by the join key and thus allows for
the incremental joining of the streams. Each of these strategies, however,
the incremental joining of the streams. Each of these strategies, however,
requires the materialization of some number of the streams either in sorted
order or in a hash table form.
order or in a hash table form.
When all sides of the join are significantly large tables (> 1 billion records),
materializing the pre-join streams requires complex distributed memory
management. The complexity of the memory management is only amplified by
management. The complexity of the memory management is only amplified by
the fact that we are targeting highly concurrent, multitenant workloads.
This is, as far as the authors are aware, an active academic research
problem that we would be more than willing to engage with the academic
community to help resolving in a scalable manner.
This is, as far as we are aware, an active academic research
problem that we would be willing to help resolve in a scalable manner.
\section{Performance}
\label{sec:benchmarks}
Druid runs in production at several organizations, and to demonstrate its
performance, we have chosen to share some real world numbers for the main production
cluster running at Metamarkets in early 2014. For comparison with other databases
cluster running at Metamarkets as of early 2014. For comparison with other databases
we also include results from synthetic workloads on TPC-H data.
\subsection{Query Performance in Production}
@ -789,7 +789,7 @@ based on a given metric is much more expensive than a simple count over a time
range. To showcase the average query latencies in a production Druid cluster,
we selected 8 of our most queried data sources, described in Table~\ref{tab:datasources}.
Approximately 30\% of the queries are standard
Approximately 30\% of queries are standard
aggregates involving different types of metrics and filters, 60\% of queries
are ordered group bys over one or more dimensions with aggregates, and 10\% of
queries are search queries and metadata retrieval queries. The number of
@ -827,7 +827,7 @@ approximately 10TB of segments loaded. Collectively,
there are about 50 billion Druid rows in this tier. Results for
every data source are not shown.
\item The hot tier uses Intel Xeon E5-2670 processors and consists of 1302 processing
\item The hot tier uses Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 processors and consists of 1302 processing
threads and 672 total cores (hyperthreaded).
\item A memory-mapped storage engine was used (the machine was configured to
@ -838,28 +838,28 @@ Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
data sources, average query latency is approximately 550 milliseconds, with
90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and
99\% of queries returning in less than 10 seconds. Occasionally we observe
spikes in latency, as observed on February 19, in which case network issues on
99\% of queries returning in less than 10 seconds. Occasionally we observe
spikes in latency, as observed on February 19, where network issues on
the Memcached instances were compounded by very high query load on one of our
largest datasources.
largest data sources.
\begin{figure}
\centering
\centering
\includegraphics[width = 2.3in]{avg_query_latency}
\includegraphics[width = 2.3in]{query_percentiles}
\caption{Query latencies of production data sources.}
\caption{Query latencies of production data sources.}
\label{fig:query_latency}
\end{figure}
\begin{figure}
\centering
\centering
\includegraphics[width = 2.8in]{queries_per_min}
\caption{Queries per minute of production data sources.}
\caption{Queries per minute of production data sources.}
\label{fig:queries_per_min}
\end{figure}
\subsection{Query Benchmarks on TPC-H Data}
We also present Druid benchmarks on TPC-H data.
We also present Druid benchmarks on TPC-H data.
Most TPC-H queries do not directly apply to Druid, so we
selected queries more typical of Druid's workload to demonstrate query performance. As a
comparison, we also provide the results of the same queries using MySQL using the
@ -871,8 +871,8 @@ open source column store because we were not confident we could correctly tune
it for optimal performance.
Our Druid setup used Amazon EC2
\texttt{m3.2xlarge} (Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz) instances for
historical nodes and \texttt{c3.2xlarge} (Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz) instances for broker
\texttt{m3.2xlarge} instance types (Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2680 v2 @ 2.80GHz) for
historical nodes and \texttt{c3.2xlarge} instances (Intel\textsuperscript{\textregistered} Xeon\textsuperscript{\textregistered} E5-2670 v2 @ 2.50GHz) for broker
nodes. Our MySQL setup was an Amazon RDS instance that ran on the same \texttt{m3.2xlarge} instance type.
The results for the 1 GB TPC-H data set are shown
@ -918,7 +918,7 @@ well.
To showcase Druid's data ingestion latency, we selected several production
datasources of varying dimensions, metrics, and event volumes. Our production
ingestion setup consists of 6 nodes, totalling 360GB of RAM and 96 cores
(12 x Intel Xeon E5-2670).
(12 x Intel\textsuperscript\textregistered Xeon\textsuperscript\textregistered E5-2670).
Note that in this setup, several other data sources were being ingested and
many other Druid related ingestion tasks were running concurrently on the machines.
@ -931,7 +931,7 @@ aggregations we want to perform on those metrics. With the most basic data set
800,000 events/second/core, which is really just a measurement of how fast we can
deserialize events. Real world data sets are never this simple.
Table~\ref{tab:ingest_datasources} shows a selection of data sources and their
characteristics.
characteristics.
\begin{table}
\centering
@ -974,9 +974,9 @@ running an Amazon \texttt{cc2.8xlarge} instance.
The latency measurements we presented are sufficient to address the stated
problems of interactivity. We would prefer the variability in the latencies to
be less. It is still very possible to decrease latencies by adding
be less. It is still possible to decrease latencies by adding
additional hardware, but we have not chosen to do so because infrastructure
costs are still a consideration to us.
costs are still a consideration for us.
\section{Druid in Production}\label{sec:production}
Over the last few years, we have gained tremendous knowledge about handling
@ -984,12 +984,12 @@ production workloads with Druid and have made a couple of interesting observatio
\paragraph{Query Patterns}
Druid is often used to explore data and generate reports on data. In the
explore use case, the number of queries issued by a single user is much higher
explore use case, the number of queries issued by a single user are much higher
than in the reporting use case. Exploratory queries often involve progressively
adding filters for the same time range to narrow down results. Users tend to
explore short time intervals of recent data. In the generate report use case,
users query for much longer data intervals, but users also already know the
queries they want to issue.
users query for much longer data intervals, but those queries are generally few
and pre-determined.
\paragraph{Multitenancy}
Expensive concurrent queries can be problematic in a multitenant
@ -1000,26 +1000,26 @@ to address these issues. Each historical node is able to prioritize which
segments it needs to scan. Proper query planning is critical for production
workloads. Thankfully, queries for a significant amount of data tend to be for
reporting use cases and can be deprioritized. Users do not expect the same level of
interactivity in this use case as when they are exploring data.
interactivity in this use case as when they are exploring data.
\paragraph{Node failures}
Single node failures are common in distributed environments, but many nodes
failing at once are not. If historical nodes completely fail and do not
recover, their segments need to reassigned, which means we need excess cluster
recover, their segments need to be reassigned, which means we need excess cluster
capacity to load this data. The amount of additional capacity to have at any
time contributes to the cost of running a cluster. From our experiences, it is
extremely rare to see more than 2 nodes completely fail at once and hence, we
leave enough capacity in our cluster to completely reassign the data from 2
historical nodes.
historical nodes.
\paragraph{Data Center Outages}
Complete cluster failures are possible, but extremely rare. If Druid is
only deployed in a single data center, it is possible for the entire data
center to fail. In such cases, new machines need to be provisioned. As long as
deep storage is still available, cluster recovery time is network bound as
deep storage is still available, cluster recovery time is network bound, as
historical nodes simply need to redownload every segment from deep storage. We
have experienced such failures in the past, and the recovery time was around
several hours in the AWS ecosystem for several TBs of data.
have experienced such failures in the past, and the recovery time was
several hours in the Amazon AWS ecosystem for several terabytes of data.
\subsection{Operational Monitoring}
Proper monitoring is critical to run a large scale distributed cluster.
@ -1035,20 +1035,20 @@ performance and stability of the production cluster. This dedicated metrics
cluster has allowed us to find numerous production problems, such as gradual
query speed degregations, less than optimally tuned hardware, and various other
system bottlenecks. We also use a metrics cluster to analyze what queries are
made in production and what users are most interested in.
made in production and what aspects of the data users are most interested in.
\subsection{Pairing Druid with a Stream Processor}
At the time of writing, Druid can only understand fully denormalized data
Currently, Druid can only understand fully denormalized data
streams. In order to provide full business logic in production, Druid can be
paired with a stream processor such as Apache Storm \cite{marz2013storm}.
A Storm topology consumes events from a data stream, retains only those that are
“on-time”, and applies any relevant business logic. This could range from
simple transformations, such as id to name lookups, up to complex operations
simple transformations, such as id to name lookups, to complex operations
such as multi-stream joins. The Storm topology forwards the processed event
stream to Druid in real-time. Storm handles the streaming data processing work,
and Druid is used for responding to queries for both real-time and
historical data.
historical data.
\subsection{Multiple Data Center Distribution}
Large scale production outages may not only affect single nodes, but entire
@ -1058,13 +1058,13 @@ exactly replicated across historical nodes in multiple data centers.
Similarily, query preference can be assigned to different tiers. It is possible
to have nodes in one data center act as a primary cluster (and receive all
queries) and have a redundant cluster in another data center. Such a setup may
be desired if one data center is situated much closer to users.
be desired if one data center is situated much closer to users.
\section{Related Work}
\label{sec:related}
Cattell \cite{cattell2011scalable} maintains a great summary about existing
Scalable SQL and NoSQL data stores. Hu \cite{hu2011stream} contributed another
great summary for streaming databases. Druid feature-wise sits somewhere
great summary for streaming databases. Druid, feature-wise, sits somewhere
between Googles Dremel \cite{melnik2010dremel} and PowerDrill
\cite{hall2012processing}. Druid has most of the features implemented in Dremel
(Dremel handles arbitrary nested data structures while Druid only allows for a
@ -1074,30 +1074,30 @@ algorithms mentioned in PowerDrill.
Although Druid builds on many of the same principles as other distributed
columnar data stores \cite{fink2012distributed}, many of these data stores are
designed to be more generic key-value stores \cite{lakshman2010cassandra} and do not
support computation directly in the storage layer. There are also other data
stores designed for some of the same of the data warehousing issues that Druid
is meant to solve. These systems include include in-memory databases such as
support computation directly in the storage layer. There are also other data
stores designed for some of the same data warehousing issues that Druid
is meant to solve. These systems include in-memory databases such as
SAPs HANA \cite{farber2012sap} and VoltDB \cite{voltdb2010voltdb}. These data
stores lack Druid's low latency ingestion characteristics. Druid also has
native analytical features baked in, similar to \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
native analytical features baked in, similar to ParAccel \cite{paraccel2013}, however,
Druid allows system wide rolling software updates with no downtime.
Druid is similiar to \cite{stonebraker2005c, cipar2012lazybase} in that it has
Druid is similiar to C-Store \cite{stonebraker2005c} and LazyBase \cite{cipar2012lazybase} in that it has
two subsystems, a read-optimized subsystem in the historical nodes and a
write-optimized subsystem in real-time nodes. Real-time nodes are designed to
write-optimized subsystem in the real-time nodes. Real-time nodes are designed to
ingest a high volume of append heavy data, and do not support data updates.
Unlike the two aforementioned systems, Druid is meant for OLAP transactions and
not OLTP transactions.
Druid's low latency data ingestion features share some similarities with
Trident/Storm \cite{marz2013storm} and Streaming Spark
Trident/Storm \cite{marz2013storm} and Spark Streaming
\cite{zaharia2012discretized}, however, both systems are focused on stream
processing whereas Druid is focused on ingestion and aggregation. Stream
processors are great complements to Druid as a means of pre-processing the data
before the data enters Druid.
There are a class of systems that specialize in queries on top of cluster
computing frameworks. Shark \cite{engle2012shark} is such a system for queries
computing frameworks. Shark \cite{engle2012shark} is such a system for queries
on top of Spark, and Cloudera's Impala \cite{cloudera2013} is another system
focused on optimizing query performance on top of HDFS. Druid historical nodes
download data locally and only work with native Druid indexes. We believe this
@ -1111,7 +1111,7 @@ stores \cite{macnicol2004sybase}.
\section{Conclusions}
\label{sec:conclusions}
In this paper, we presented Druid, a distributed, column-oriented, real-time
In this paper we presented Druid, a distributed, column-oriented, real-time
analytical data store. Druid is designed to power high performance applications
and is optimized for low query latencies. Druid supports streaming data
ingestion and is fault-tolerant. We discussed Druid benchmarks and
@ -1123,8 +1123,8 @@ as the storage format, query language, and general execution.
\section{Acknowledgements}
\label{sec:acknowledgements}
Druid could not have been built without the help of many great engineers at
Metamarkets and in the community. We want to thank everyone that has
contributed to the Druid codebase for their invaluable support.
Metamarkets and in the community. We want to thank everyone that has
contributed to the Druid codebase for their invaluable support.
% The following two commands are all you need in the
% initial runs of your .tex file to
@ -1136,7 +1136,7 @@ contributed to the Druid codebase for their invaluable support.
% latex bibtex latex latex
% to resolve all references
%Generated by bibtex from your ~.bib file. Run latex,
%Generated by bibtex from your ~.bib file. Run latex,
%then bibtex, then latex twice (to resolve references).
%APPENDIX is optional.

View File

@ -0,0 +1 @@
1234

Binary file not shown.

Binary file not shown.

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -20,6 +20,8 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.client.cache.Cache;
@ -72,22 +74,25 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
&& strategy != null
&& cacheConfig.isPopulateCache()
// historical only populates distributed cache since the cache lookups are done at broker.
&& !(cache instanceof MapCache) ;
Sequence<T> results = base.run(query);
&& !(cache instanceof MapCache);
if (populateCache) {
Sequence<T> results = base.run(query);
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);
ArrayList<T> resultAsList = Sequences.toList(results, new ArrayList<T>());
CacheUtil.populate(
cache,
mapper,
key,
Sequences.toList(Sequences.map(results, strategy.prepareForCache()), new ArrayList())
Lists.transform(resultAsList, strategy.prepareForCache())
);
return Sequences.simple(resultAsList);
} else {
return base.run(query);
}
return results;
}
}

View File

@ -0,0 +1,113 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class RoutingDruidClient<IntermediateType, FinalType>
{
private static final Logger log = new Logger(RoutingDruidClient.class);
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final AtomicInteger openConnections;
private final boolean isSmile;
@Inject
public RoutingDruidClient(
ObjectMapper objectMapper,
@Global HttpClient httpClient
)
{
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
}
public int getNumOpenConnections()
{
return openConnections.get();
}
public ListenableFuture<FinalType> run(
String host,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
final ListenableFuture<FinalType> future;
final String url = String.format("http://%s/druid/v2/", host);
try {
log.debug("Querying url[%s]", url);
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.go(responseHandler);
openConnections.getAndIncrement();
Futures.addCallback(
future,
new FutureCallback<FinalType>()
{
@Override
public void onSuccess(FinalType result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
}
}
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return future;
}
}

View File

@ -39,6 +39,8 @@ public interface Cache
public CacheStats getStats();
public boolean isLocal();
public class NamedKey
{
final public String namespace;

View File

@ -149,4 +149,8 @@ public class MapCache implements Cache
retVal.rewind();
return retVal;
}
public boolean isLocal() {
return true;
}
}

View File

@ -278,4 +278,8 @@ public class MemcachedCache implements Cache
// hash keys to keep things under 250 characters for memcached
return memcachedPrefix + ":" + DigestUtils.sha1Hex(key.namespace) + ":" + DigestUtils.sha1Hex(key.key);
}
public boolean isLocal() {
return false;
}
}

View File

@ -0,0 +1,33 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client.selector;
import com.metamx.common.Pair;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
/**
*/
public interface HostSelector<T>
{
public String getDefaultServiceName();
public Pair<String, ServerDiscoverySelector> select(Query<T> query);
}

View File

@ -31,17 +31,15 @@ import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing;
import io.druid.query.MetricsEmittingExecutorService;
import io.druid.query.PrioritizedExecutorService;
import io.druid.server.DruidProcessingConfig;
import io.druid.server.VMUtils;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -82,37 +80,26 @@ public class DruidProcessingModule implements Module
public StupidPool<ByteBuffer> getIntermediateResultsPool(DruidProcessingConfig config)
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);
long maxDirectMemory = VMUtils.getMaxDirectMemory();
if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj);
} else {
long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue();
final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1);
if (maxDirectMemory < memoryNeeded) {
throw new ProvisionException(
String.format(
"Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: "
+ "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]",
maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads()
)
);
}
final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1);
if (maxDirectMemory < memoryNeeded) {
throw new ProvisionException(
String.format(
"Not enough direct memory. Please adjust -XX:MaxDirectMemorySize, druid.computation.buffer.size, or druid.processing.numThreads: "
+ "maxDirectMemory[%,d], memoryNeeded[%,d] = druid.computation.buffer.size[%,d] * ( druid.processing.numThreads[%,d] + 1 )",
maxDirectMemory,
memoryNeeded,
config.intermediateComputeSizeBytes(),
config.getNumThreads()
)
);
}
} catch(UnsupportedOperationException e) {
log.info(e.getMessage());
}
catch (ClassNotFoundException e) {
log.info("No VM class, cannot do memory check.");
}
catch (NoSuchMethodException e) {
log.info("VM.maxDirectMemory doesn't exist, cannot do memory check.");
}
catch (InvocationTargetException e) {
log.warn(e, "static method shouldn't throw this");
}
catch (IllegalAccessException e) {
log.warn(e, "public method, shouldn't throw this");
catch(RuntimeException e) {
log.warn(e, e.getMessage());
}
return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes());

View File

@ -0,0 +1,238 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.client.RoutingDruidClient;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.Query;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.DateTime;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
/**
*/
@WebServlet(asyncSupported = true)
public class AsyncQueryForwardingServlet extends HttpServlet
{
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final String DISPATCHED = "dispatched";
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
private final RoutingDruidClient routingDruidClient;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryIDProvider idProvider;
public AsyncQueryForwardingServlet(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
RoutingDruidClient routingDruidClient,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryIDProvider idProvider
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
this.routingDruidClient = routingDruidClient;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.idProvider = idProvider;
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
) throws ServletException, IOException
{
final long start = System.currentTimeMillis();
Query query = null;
String queryId;
final boolean isSmile = "application/smile".equals(req.getContentType());
ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
try {
final AsyncContext ctx = req.startAsync(req, resp);
if (req.getAttribute(DISPATCHED) != null) {
return;
}
req.setAttribute(DISPATCHED, true);
resp.setStatus(200);
resp.setContentType("application/x-javascript");
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
queryId = idProvider.next(query);
query = query.withId(queryId);
}
requestLogger.log(
new RequestLogLine(new DateTime(), req.getRemoteAddr(), query)
);
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getHost(query);
final Query theQuery = query;
final String theQueryId = queryId;
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) {
try {
outputStream.write(bytes);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
byte[] bytes = getContentBytes(chunk.getContent());
if (bytes.length > 0) {
try {
clientResponse.getObj().write(bytes);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final long requestTime = System.currentTimeMillis() - start;
log.info("Request time: %d", requestTime);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser2(theQuery.getDataSource().getName())
.setUser4(theQuery.getType())
.setUser5(theQuery.getIntervals().get(0).toString())
.setUser6(String.valueOf(theQuery.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(theQueryId)
.setUser9(theQuery.getDuration().toPeriod().toStandardMinutes().toString())
.build("request/time", requestTime)
);
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
ctx.dispatch();
}
return ClientResponse.finished(obj);
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
};
ctx.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.run(host, theQuery, responseHandler);
}
}
);
}
catch (Exception e) {
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
log.makeAlert(e, "Exception handling request")
.addData("query", query)
.addData("peer", req.getRemoteAddr())
.emit();
}
}
}

View File

@ -31,4 +31,12 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig
{
return 1024 * 1024 * 1024;
}
@Override @Config(value = "${base_path}.numThreads")
public int getNumThreads()
{
// default to leaving one core for background tasks
final int processors = Runtime.getRuntime().availableProcessors();
return processors > 1 ? processors - 1 : processors;
}
}

View File

@ -136,8 +136,8 @@ public class QueryResource
.setUser5(query.getIntervals().get(0).toString())
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(req.getRemoteAddr())
.setUser8(queryId)
.setUser9(query.getDuration().toPeriod().toStandardMinutes().toString())
.setUser10(queryId)
.build("request/time", requestTime)
);
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import java.lang.reflect.InvocationTargetException;
public class VMUtils
{
public static long getMaxDirectMemory() throws UnsupportedOperationException
{
try {
Class<?> vmClass = Class.forName("sun.misc.VM");
Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null);
if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) {
throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj));
} else {
return ((Number) maxDirectMemoryObj).longValue();
}
}
catch (ClassNotFoundException e) {
throw new UnsupportedOperationException("No VM class, cannot do memory check.", e);
}
catch (NoSuchMethodException e) {
throw new UnsupportedOperationException("VM.maxDirectMemory doesn't exist, cannot do memory check.", e);
}
catch (InvocationTargetException e) {
throw new RuntimeException("static method shouldn't throw this", e);
}
catch (IllegalAccessException e) {
throw new RuntimeException("public method, shouldn't throw this", e);
}
}
}

View File

@ -26,12 +26,16 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.DatabaseSegmentManager;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -86,7 +90,7 @@ public class DatasourcesResource
@QueryParam("simple") String simple
)
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
Response.ResponseBuilder builder = Response.ok();
if (full != null) {
return builder.entity(getDataSources()).build();
} else if (simple != null) {
@ -128,15 +132,71 @@ public class DatasourcesResource
@Path("/{dataSourceName}")
@Produces("application/json")
public Response getTheDataSource(
@PathParam("dataSourceName") final String dataSourceName
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("full") final String full
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
return Response.ok(dataSource).build();
if (full != null) {
return Response.ok(dataSource).build();
}
Map<String, Object> tiers = Maps.newHashMap();
Map<String, Object> segments = Maps.newHashMap();
Map<String, Map<String, Object>> retVal = ImmutableMap.of(
"tiers", tiers,
"segments", segments
);
int totalSegmentCount = 0;
long totalSegmentSize = 0;
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
for (DruidServer druidServer : serverInventoryView.getInventory()) {
DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
if (druidDataSource == null) {
continue;
}
long dataSourceSegmentSize = 0;
for (DataSegment dataSegment : druidDataSource.getSegments()) {
dataSourceSegmentSize += dataSegment.getSize();
if (dataSegment.getInterval().getStartMillis() < minTime) {
minTime = dataSegment.getInterval().getStartMillis();
}
if (dataSegment.getInterval().getEndMillis() > maxTime) {
maxTime = dataSegment.getInterval().getEndMillis();
}
}
// segment stats
totalSegmentCount += druidDataSource.getSegments().size();
totalSegmentSize += dataSourceSegmentSize;
// tier stats
Map<String, Object> tierStats = (Map) tiers.get(druidServer.getTier());
if (tierStats == null) {
tierStats = Maps.newHashMap();
tiers.put(druidServer.getTier(), tierStats);
}
int segmentCount = MapUtils.getInt(tierStats, "segmentCount", 0);
tierStats.put("segmentCount", segmentCount + druidDataSource.getSegments().size());
long segmentSize = MapUtils.getLong(tierStats, "size", 0L);
tierStats.put("size", segmentSize + dataSourceSegmentSize);
}
segments.put("count", totalSegmentCount);
segments.put("size", totalSegmentSize);
segments.put("minTime", new DateTime(minTime));
segments.put("maxTime", new DateTime(maxTime));
return Response.ok(retVal).build();
}
@POST
@ -147,10 +207,10 @@ public class DatasourcesResource
)
{
if (!databaseSegmentManager.enableDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
return Response.status(Response.Status.OK).build();
return Response.ok().build();
}
@DELETE
@ -163,29 +223,155 @@ public class DatasourcesResource
)
{
if (indexingServiceClient == null) {
return Response.ok().entity(ImmutableMap.of("error", "no indexing service found")).build();
return Response.ok(ImmutableMap.of("error", "no indexing service found")).build();
}
if (kill != null && Boolean.valueOf(kill)) {
try {
indexingServiceClient.killSegments(dataSourceName, new Interval(interval));
}
catch (Exception e) {
return Response.status(Response.Status.NOT_FOUND)
.entity(
ImmutableMap.of(
"error",
"Exception occurred. Are you sure you have an indexing service?"
)
)
return Response.serverError().entity(
ImmutableMap.of(
"error",
"Exception occurred. Are you sure you have an indexing service?"
)
)
.build();
}
} else {
if (!databaseSegmentManager.removeDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
}
return Response.status(Response.Status.OK).build();
return Response.ok().build();
}
@GET
@Path("/{dataSourceName}/intervals")
@Produces("application/json")
public Response getSegmentDataSourceIntervals(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("simple") String simple,
@QueryParam("full") String full
)
{
final DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.noContent().build();
}
final Comparator<Interval> comparator = Comparators.inverse(Comparators.intervalsByStartThenEnd());
if (full != null) {
final Map<Interval, Map<String, Object>> retVal = Maps.newTreeMap(comparator);
for (DataSegment dataSegment : dataSource.getSegments()) {
Map<String, Object> segments = retVal.get(dataSegment.getInterval());
if (segments == null) {
segments = Maps.newHashMap();
retVal.put(dataSegment.getInterval(), segments);
}
Pair<DataSegment, Set<String>> val = getSegment(dataSegment.getIdentifier());
segments.put(dataSegment.getIdentifier(), ImmutableMap.of("metadata", val.lhs, "servers", val.rhs));
}
return Response.ok(retVal).build();
}
if (simple != null) {
final Map<Interval, Map<String, Object>> retVal = Maps.newHashMap();
for (DataSegment dataSegment : dataSource.getSegments()) {
Map<String, Object> properties = retVal.get(dataSegment.getInterval());
if (properties == null) {
properties = Maps.newHashMap();
properties.put("size", dataSegment.getSize());
properties.put("count", 1);
retVal.put(dataSegment.getInterval(), properties);
} else {
properties.put("size", MapUtils.getLong(properties, "size", 0L) + dataSegment.getSize());
properties.put("count", MapUtils.getInt(properties, "count", 0) + 1);
}
}
return Response.ok(retVal).build();
}
final Set<Interval> intervals = Sets.newTreeSet(comparator);
for (DataSegment dataSegment : dataSource.getSegments()) {
intervals.add(dataSegment.getInterval());
}
return Response.ok(intervals).build();
}
@GET
@Path("/{dataSourceName}/intervals/{interval}")
@Produces("application/json")
public Response getSegmentDataSourceSpecificInterval(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("interval") String interval,
@QueryParam("simple") String simple,
@QueryParam("full") String full
)
{
final DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
final Interval theInterval = new Interval(interval.replace("_", "/"));
if (dataSource == null || interval == null) {
return Response.noContent().build();
}
final Comparator<Interval> comparator = Comparators.inverse(Comparators.intervalsByStartThenEnd());
if (full != null) {
final Map<Interval, Map<String, Object>> retVal = Maps.newTreeMap(comparator);
for (DataSegment dataSegment : dataSource.getSegments()) {
if (theInterval.contains(dataSegment.getInterval())) {
Map<String, Object> segments = retVal.get(dataSegment.getInterval());
if (segments == null) {
segments = Maps.newHashMap();
retVal.put(dataSegment.getInterval(), segments);
}
Pair<DataSegment, Set<String>> val = getSegment(dataSegment.getIdentifier());
segments.put(dataSegment.getIdentifier(), ImmutableMap.of("metadata", val.lhs, "servers", val.rhs));
}
}
return Response.ok(retVal).build();
}
if (simple != null) {
final Map<Interval, Map<String, Object>> retVal = Maps.newHashMap();
for (DataSegment dataSegment : dataSource.getSegments()) {
if (theInterval.contains(dataSegment.getInterval())) {
Map<String, Object> properties = retVal.get(dataSegment.getInterval());
if (properties == null) {
properties = Maps.newHashMap();
properties.put("size", dataSegment.getSize());
properties.put("count", 1);
retVal.put(dataSegment.getInterval(), properties);
} else {
properties.put("size", MapUtils.getLong(properties, "size", 0L) + dataSegment.getSize());
properties.put("count", MapUtils.getInt(properties, "count", 0) + 1);
}
}
}
return Response.ok(retVal).build();
}
final Set<String> retVal = Sets.newTreeSet(Comparators.inverse(String.CASE_INSENSITIVE_ORDER));
for (DataSegment dataSegment : dataSource.getSegments()) {
if (theInterval.contains(dataSegment.getInterval())) {
retVal.add(dataSegment.getIdentifier());
}
}
return Response.ok(retVal).build();
}
@GET
@ -198,10 +384,10 @@ public class DatasourcesResource
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
Response.ResponseBuilder builder = Response.ok();
if (full != null) {
return builder.entity(dataSource.getSegments()).build();
}
@ -212,7 +398,7 @@ public class DatasourcesResource
new Function<DataSegment, Object>()
{
@Override
public Object apply(@Nullable DataSegment segment)
public Object apply(DataSegment segment)
{
return segment.getIdentifier();
}
@ -231,15 +417,18 @@ public class DatasourcesResource
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
for (DataSegment segment : dataSource.getSegments()) {
if (segment.getIdentifier().equalsIgnoreCase(segmentId)) {
return Response.status(Response.Status.OK).entity(segment).build();
}
Pair<DataSegment, Set<String>> retVal = getSegment(segmentId);
if (retVal != null) {
return Response.ok(
ImmutableMap.of("metadata", retVal.lhs, "servers", retVal.rhs)
).build();
}
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
@DELETE
@ -250,10 +439,10 @@ public class DatasourcesResource
)
{
if (!databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
return Response.status(Response.Status.OK).build();
return Response.ok().build();
}
@POST
@ -265,10 +454,27 @@ public class DatasourcesResource
)
{
if (!databaseSegmentManager.enableSegment(segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
return Response.noContent().build();
}
return Response.status(Response.Status.OK).build();
return Response.ok().build();
}
@GET
@Path("/{dataSourceName}/tiers")
@Produces("application/json")
public Response getSegmentDataSourceTiers(
@PathParam("dataSourceName") String dataSourceName
)
{
Set<String> retVal = Sets.newHashSet();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
if (druidServer.getDataSource(dataSourceName) != null) {
retVal.add(druidServer.getTier());
}
}
return Response.ok(retVal).build();
}
private DruidDataSource getDataSource(final String dataSourceName)
@ -345,4 +551,23 @@ public class DatasourcesResource
);
return dataSources;
}
private Pair<DataSegment, Set<String>> getSegment(String segmentId)
{
DataSegment theSegment = null;
Set<String> servers = Sets.newHashSet();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
DataSegment currSegment = druidServer.getSegments().get(segmentId);
if (currSegment != null) {
theSegment = currSegment;
servers.add(druidServer.getHost());
}
}
if (theSegment == null) {
return null;
}
return new Pair<>(theSegment, servers);
}
}

View File

@ -48,6 +48,8 @@ public class ServersResource
return new ImmutableMap.Builder<String, Object>()
.put("host", input.getHost())
.put("tier", input.getTier())
.put("type", input.getType())
.put("priority", input.getPriority())
.put("currSize", input.getCurrSize())
.put("maxSize", input.getMaxSize())
.build();

View File

@ -19,18 +19,24 @@
package io.druid.server.http;
import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.common.base.Function;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
@ -86,4 +92,55 @@ public class TiersResource
return builder.entity(tiers).build();
}
@GET
@Path("/{tierName}")
@Produces("application/json")
public Response getTierDatasources(
@PathParam("tierName") String tierName,
@QueryParam("simple") String simple
)
{
if (simple != null) {
Table<String, Interval, Map<String, Object>> retVal = HashBasedTable.create();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
if (druidServer.getTier().equalsIgnoreCase(tierName)) {
for (DataSegment dataSegment : druidServer.getSegments().values()) {
Map<String, Object> properties = retVal.get(dataSegment.getDataSource(), dataSegment.getInterval());
if (properties == null) {
properties = Maps.newHashMap();
retVal.put(dataSegment.getDataSource(), dataSegment.getInterval(), properties);
}
properties.put("size", MapUtils.getLong(properties, "size", 0L) + dataSegment.getSize());
properties.put("count", MapUtils.getInt(properties, "count", 0) + 1);
}
}
}
return Response.ok(retVal.rowMap()).build();
}
Set<String> retVal = Sets.newHashSet();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
if (druidServer.getTier().equalsIgnoreCase(tierName)) {
retVal.addAll(
Lists.newArrayList(
Iterables.transform(
druidServer.getDataSources(),
new Function<DruidDataSource, String>()
{
@Override
public String apply(DruidDataSource input)
{
return input.getName();
}
}
)
)
);
}
}
return Response.ok(retVal).build();
}
}

View File

@ -49,7 +49,7 @@ import io.druid.server.DruidNode;
import io.druid.server.StatusResource;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import javax.servlet.ServletException;
@ -154,13 +154,11 @@ public class JettyServerModule extends JerseyServletModule
threadPool.setMinThreads(config.getNumThreads());
threadPool.setMaxThreads(config.getNumThreads());
final Server server = new Server();
server.setThreadPool(threadPool);
final Server server = new Server(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();
ServerConnector connector = new ServerConnector(server);
connector.setPort(node.getPort());
connector.setMaxIdleTime(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
connector.setStatsOn(true);
connector.setIdleTimeout(Ints.checkedCast(config.getMaxIdleTime().toStandardDuration().getMillis()));
server.setConnectors(new Connector[]{connector});

View File

@ -31,7 +31,8 @@ public class ServerConfig
{
@JsonProperty
@Min(1)
private int numThreads = Math.max(10, Runtime.getRuntime().availableProcessors() + 1);
// Jetty defaults are whack
private int numThreads = Math.max(10, (Runtime.getRuntime().availableProcessors() * 17) / 16 + 2);
@JsonProperty
@NotNull

View File

@ -57,7 +57,7 @@ public class CoordinatorRuleManager
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
private final Supplier<TierConfig> config;
private final Supplier<TieredBrokerConfig> config;
private final ServerDiscoverySelector selector;
private final StatusResponseHandler responseHandler;
@ -73,7 +73,7 @@ public class CoordinatorRuleManager
public CoordinatorRuleManager(
@Global HttpClient httpClient,
@Json ObjectMapper jsonMapper,
Supplier<TierConfig> config,
Supplier<TieredBrokerConfig> config,
ServerDiscoverySelector selector
)
{

View File

@ -0,0 +1,98 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class QueryHostFinder<T>
{
private static EmittingLogger log = new EmittingLogger(QueryHostFinder.class);
private final TieredBrokerHostSelector hostSelector;
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
@Inject
public QueryHostFinder(
TieredBrokerHostSelector hostSelector
)
{
this.hostSelector = hostSelector;
}
public Server findServer(Query<T> query)
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
final String serviceName = selected.lhs;
final ServerDiscoverySelector selector = selected.rhs;
Server server = selector.pick();
if (server == null) {
log.error(
"WTF?! No server found for serviceName[%s]. Using backup",
serviceName
);
server = serverBackup.get(serviceName);
if (server == null) {
log.error(
"WTF?! No backup found for serviceName[%s]. Using default[%s]",
serviceName,
hostSelector.getDefaultServiceName()
);
server = serverBackup.get(hostSelector.getDefaultServiceName());
}
}
if (server != null) {
serverBackup.put(serviceName, server);
}
return server;
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return null;
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
}

View File

@ -1,82 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
/**
*/
public class RouterQuerySegmentWalker implements QuerySegmentWalker
{
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final BrokerSelector brokerSelector;
private final TierConfig tierConfig;
@Inject
public RouterQuerySegmentWalker(
QueryToolChestWarehouse warehouse,
ObjectMapper objectMapper,
@Global HttpClient httpClient,
BrokerSelector brokerSelector,
TierConfig tierConfig
)
{
this.warehouse = warehouse;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.brokerSelector = brokerSelector;
this.tierConfig = tierConfig;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
return makeRunner();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
return makeRunner();
}
private <T> QueryRunner<T> makeRunner()
{
return new TierAwareQueryRunner<T>(
warehouse,
objectMapper,
httpClient,
brokerSelector,
tierConfig
);
}
}

View File

@ -1,121 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.Pair;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import io.druid.client.DirectDruidClient;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class TierAwareQueryRunner<T> implements QueryRunner<T>
{
private static EmittingLogger log = new EmittingLogger(TierAwareQueryRunner.class);
private final QueryToolChestWarehouse warehouse;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final BrokerSelector<T> brokerSelector;
private final TierConfig tierConfig;
private final ConcurrentHashMap<String, Server> serverBackup = new ConcurrentHashMap<String, Server>();
public TierAwareQueryRunner(
QueryToolChestWarehouse warehouse,
ObjectMapper objectMapper,
HttpClient httpClient,
BrokerSelector<T> brokerSelector,
TierConfig tierConfig
)
{
this.warehouse = warehouse;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.brokerSelector = brokerSelector;
this.tierConfig = tierConfig;
}
public Server findServer(Query<T> query)
{
final Pair<String, ServerDiscoverySelector> selected = brokerSelector.select(query);
final String brokerServiceName = selected.lhs;
final ServerDiscoverySelector selector = selected.rhs;
Server server = selector.pick();
if (server == null) {
log.error(
"WTF?! No server found for brokerServiceName[%s]. Using backup",
brokerServiceName
);
server = serverBackup.get(brokerServiceName);
if (server == null) {
log.makeAlert(
"WTF?! No backup found for brokerServiceName[%s]. Using default[%s]",
brokerServiceName,
tierConfig.getDefaultBrokerServiceName()
).emit();
server = serverBackup.get(tierConfig.getDefaultBrokerServiceName());
}
} else {
serverBackup.put(brokerServiceName, server);
}
return server;
}
@Override
public Sequence<T> run(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return Sequences.empty();
}
QueryRunner<T> client = new DirectDruidClient<T>(
warehouse,
objectMapper,
httpClient,
server.getHost()
);
return client.run(query);
}
}

View File

@ -29,7 +29,7 @@ import java.util.LinkedHashMap;
/**
*/
public class TierConfig
public class TieredBrokerConfig
{
@JsonProperty
@NotNull

View File

@ -20,20 +20,20 @@
package io.druid.server.router;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.client.selector.HostSelector;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import java.util.List;
@ -42,23 +42,23 @@ import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class BrokerSelector<T>
public class TieredBrokerHostSelector<T> implements HostSelector<T>
{
private static EmittingLogger log = new EmittingLogger(BrokerSelector.class);
private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
private final CoordinatorRuleManager ruleManager;
private final TierConfig tierConfig;
private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<String, ServerDiscoverySelector>();
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final Object lock = new Object();
private volatile boolean started = false;
@Inject
public BrokerSelector(
public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager,
TierConfig tierConfig,
TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
)
{
@ -112,6 +112,12 @@ public class BrokerSelector<T>
}
}
@Override
public String getDefaultServiceName()
{
return tierConfig.getDefaultBrokerServiceName();
}
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
{
synchronized (lock) {
@ -120,35 +126,46 @@ public class BrokerSelector<T>
}
}
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
String brokerServiceName = null;
// find the rule that can apply to the entire set of intervals
DateTime now = new DateTime();
int lastRulePosition = -1;
LoadRule baseRule = null;
// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
brokerServiceName = Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
);
}
for (Interval interval : query.getIntervals()) {
int currRulePosition = 0;
for (Rule rule : rules) {
if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
lastRulePosition = currRulePosition;
baseRule = (LoadRule) rule;
if (brokerServiceName == null) {
List<Rule> rules = ruleManager.getRulesWithDefault((query.getDataSource()).getName());
// find the rule that can apply to the entire set of intervals
DateTime now = new DateTime();
int lastRulePosition = -1;
LoadRule baseRule = null;
for (Interval interval : query.getIntervals()) {
int currRulePosition = 0;
for (Rule rule : rules) {
if (rule instanceof LoadRule && currRulePosition > lastRulePosition && rule.appliesTo(interval, now)) {
lastRulePosition = currRulePosition;
baseRule = (LoadRule) rule;
break;
}
currRulePosition++;
}
}
if (baseRule == null) {
return null;
}
// in the baseRule, find the broker of highest priority
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
brokerServiceName = entry.getValue();
break;
}
currRulePosition++;
}
}
if (baseRule == null) {
return null;
}
// in the baseRule, find the broker of highest priority
String brokerServiceName = null;
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
if (baseRule.getTieredReplicants().containsKey(entry.getKey())) {
brokerServiceName = entry.getValue();
break;
}
}

View File

@ -0,0 +1,185 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.granularity.AllGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNResultValue;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class CachePopulatingQueryRunnerTest
{
private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("imps", "imps"),
new LongSumAggregatorFactory("impers", "imps")
);
@Test
public void testCachePopulatingQueryRunnerResourceClosing() throws Exception
{
Iterable<Result<TopNResultValue>> expectedRes = makeTopNResults(
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
);
final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource("ds")
.dimension("top_dim")
.metric("imps")
.threshold(3)
.intervals("2011-01-05/2011-01-10")
.aggregators(AGGS)
.granularity(AllGranularity.ALL);
final AssertingClosable closable = new AssertingClosable();
final Sequence resultSeq = new ResourceClosingSequence(
Sequences.simple(expectedRes), closable
)
{
@Override
public Yielder toYielder(Object initValue, YieldingAccumulator accumulator)
{
Assert.assertFalse(closable.isClosed());
return super.toYielder(
initValue,
accumulator
);
}
};
Cache cache = EasyMock.createMock(Cache.class);
// cache populater ignores populating for local cache, so a dummy cache
EasyMock.expect(cache.isLocal()).andReturn(false);
CachePopulatingQueryRunner runner = new CachePopulatingQueryRunner(
"segment",
new SegmentDescriptor(new Interval("2011/2012"), "version", 0),
new DefaultObjectMapper(),
cache,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
new QueryRunner()
{
@Override
public Sequence run(Query query)
{
return resultSeq;
}
},
new CacheConfig()
);
Sequence res = runner.run(builder.build());
// base sequence is not closed yet
Assert.assertTrue(closable.isClosed());
ArrayList results = Sequences.toList(res, new ArrayList());
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes, results);
}
private Iterable<Result<TopNResultValue>> makeTopNResults
(Object... objects)
{
List<Result<TopNResultValue>> retVal = Lists.newArrayList();
int index = 0;
while (index < objects.length) {
DateTime timestamp = (DateTime) objects[index++];
List<Map<String, Object>> values = Lists.newArrayList();
while (index < objects.length && !(objects[index] instanceof DateTime)) {
if (objects.length - index < 3) {
throw new ISE(
"expect 3 values for each entry in the top list, had %d values left.", objects.length - index
);
}
final double imps = ((Number) objects[index + 2]).doubleValue();
final double rows = ((Number) objects[index + 1]).doubleValue();
values.add(
ImmutableMap.of(
"top_dim", objects[index],
"rows", rows,
"imps", imps,
"impers", imps,
"avg_imps_per_row", imps / rows
)
);
index += 3;
}
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
}
return retVal;
}
private static class AssertingClosable implements Closeable
{
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void close() throws IOException
{
Assert.assertFalse(closed.get());
Assert.assertTrue(closed.compareAndSet(false, true));
}
public boolean isClosed()
{
return closed.get();
}
}
}

View File

@ -40,20 +40,20 @@ import java.util.LinkedHashMap;
/**
*/
public class TierAwareQueryRunnerTest
public class QueryHostFinderTest
{
private ServerDiscoverySelector selector;
private BrokerSelector brokerSelector;
private TierConfig config;
private TieredBrokerHostSelector brokerSelector;
private TieredBrokerConfig config;
private Server server;
@Before
public void setUp() throws Exception
{
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = EasyMock.createMock(BrokerSelector.class);
brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class);
config = new TierConfig()
config = new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
@ -118,12 +118,8 @@ public class TierAwareQueryRunnerTest
EasyMock.expect(selector.pick()).andReturn(server).once();
EasyMock.replay(selector);
TierAwareQueryRunner queryRunner = new TierAwareQueryRunner(
null,
null,
null,
brokerSelector,
config
QueryHostFinder queryRunner = new QueryHostFinder(
brokerSelector
);
Server server = queryRunner.findServer(

View File

@ -50,11 +50,11 @@ import java.util.List;
/**
*/
public class BrokerSelectorTest
public class TieredBrokerHostSelectorTest
{
private ServerDiscoveryFactory factory;
private ServerDiscoverySelector selector;
private BrokerSelector brokerSelector;
private TieredBrokerHostSelector brokerSelector;
@Before
public void setUp() throws Exception
@ -62,9 +62,9 @@ public class BrokerSelectorTest
factory = EasyMock.createMock(ServerDiscoveryFactory.class);
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = new BrokerSelector(
brokerSelector = new TieredBrokerHostSelector(
new TestRuleManager(null, null, null, null),
new TierConfig()
new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
@ -112,11 +112,12 @@ public class BrokerSelectorTest
public void testBasicSelect() throws Exception
{
String brokerName = (String) brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01"))),
null
)
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2011-08-31/2011-09-01")))
.build()
).lhs;
Assert.assertEquals("coldBroker", brokerName);
@ -127,11 +128,12 @@ public class BrokerSelectorTest
public void testBasicSelect2() throws Exception
{
String brokerName = (String) brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01"))),
null
)
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2013-08-31/2013-09-01")))
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
@ -141,11 +143,12 @@ public class BrokerSelectorTest
public void testSelectMatchesNothing() throws Exception
{
Pair retVal = brokerSelector.select(
new TimeBoundaryQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01"))),
null
)
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01")))
.build()
);
Assert.assertEquals(null, retVal);
@ -199,7 +202,7 @@ public class BrokerSelectorTest
public TestRuleManager(
@Global HttpClient httpClient,
@Json ObjectMapper jsonMapper,
Supplier<TierConfig> config,
Supplier<TieredBrokerConfig> config,
ServerDiscoverySelector selector
)
{

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.75-SNAPSHOT</version>
<version>0.6.83-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -55,7 +55,7 @@ import java.util.List;
*/
@Command(
name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.73/Broker.html for a description"
description = "Runs a broker node, see http://druid.io/docs/0.6.81/Broker.html for a description"
)
public class CliBroker extends ServerRunnable
{

View File

@ -66,7 +66,7 @@ import java.util.List;
*/
@Command(
name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.73/Coordinator.html for a description."
description = "Runs the Coordinator, see http://druid.io/docs/0.6.81/Coordinator.html for a description."
)
public class CliCoordinator extends ServerRunnable
{

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.73/Batch-ingestion.html for a description."
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.81/Batch-ingestion.html for a description."
)
public class CliHadoopIndexer implements Runnable
{

View File

@ -46,7 +46,7 @@ import java.util.List;
*/
@Command(
name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.73/Historical.html for a description"
description = "Runs a Historical node, see http://druid.io/docs/0.6.81/Historical.html for a description"
)
public class CliHistorical extends ServerRunnable
{

View File

@ -93,7 +93,7 @@ import java.util.List;
*/
@Command(
name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.73/Indexing-Service.html for a description"
description = "Runs an Overlord node, see http://druid.io/docs/0.6.81/Indexing-Service.html for a description"
)
public class CliOverlord extends ServerRunnable
{

View File

@ -30,7 +30,7 @@ import java.util.List;
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.73/Realtime.html for a description"
description = "Runs a realtime node, see http://druid.io/docs/0.6.81/Realtime.html for a description"
)
public class CliRealtime extends ServerRunnable
{

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/
@Command(
name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.73/Realtime.html for a description"
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/latest/Realtime.html for a description"
)
public class CliRealtimeExample extends ServerRunnable
{

View File

@ -25,24 +25,20 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.RoutingDruidClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.QueryResource;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.router.BrokerSelector;
import io.druid.server.router.CoordinatorRuleManager;
import io.druid.server.router.RouterQuerySegmentWalker;
import io.druid.server.router.TierConfig;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.TieredBrokerConfig;
import io.druid.server.router.TieredBrokerHostSelector;
import org.eclipse.jetty.server.Server;
import java.util.List;
@ -71,19 +67,16 @@ public class CliRouter extends ServerRunnable
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.router", TierConfig.class);
JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class);
binder.bind(CoordinatorRuleManager.class);
LifecycleModule.register(binder, CoordinatorRuleManager.class);
binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class);
binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class);
binder.bind(QueryHostFinder.class).in(LazySingleton.class);
binder.bind(RoutingDruidClient.class).in(LazySingleton.class);
binder.bind(BrokerSelector.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(RouterQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class);
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
@ -92,7 +85,7 @@ public class CliRouter extends ServerRunnable
@Provides
@ManageLifecycle
public ServerDiscoverySelector getCoordinatorServerDiscoverySelector(
TierConfig config,
TieredBrokerConfig config,
ServerDiscoveryFactory factory
)

View File

@ -38,16 +38,13 @@ public class QueryJettyServerInitializer implements JettyServerInitializer
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.setResourceBase("/");
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
handlerList.setHandlers(new Handler[]{root, new DefaultHandler()});
server.setHandler(handlerList);
}
}

View File

@ -0,0 +1,104 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.RoutingDruidClient;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.AsyncQueryForwardingServlet;
import io.druid.server.QueryIDProvider;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
/**
*/
public class RouterJettyServerInitializer implements JettyServerInitializer
{
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
private final RoutingDruidClient routingDruidClient;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryIDProvider idProvider;
@Inject
public RouterJettyServerInitializer(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
RoutingDruidClient routingDruidClient,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryIDProvider idProvider
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
this.routingDruidClient = routingDruidClient;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.idProvider = idProvider;
}
@Override
public void initialize(Server server, Injector injector)
{
final ServletContextHandler queries = new ServletContextHandler(ServletContextHandler.SESSIONS);
queries.addServlet(
new ServletHolder(
new AsyncQueryForwardingServlet(
jsonMapper,
smileMapper,
hostFinder,
routingDruidClient,
emitter,
requestLogger,
idProvider
)
), "/druid/v2/*"
);
queries.addFilter(GzipFilter.class, "/druid/v2/*", null);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
final HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{queries, root, new DefaultHandler()});
server.setHandler(handlerList);
}
}