mirror of
https://github.com/apache/druid.git
synced 2025-02-17 15:35:56 +00:00
Merge branch 'master' into igalDruid
This commit is contained in:
commit
c4d06faa05
2
build.sh
2
build.sh
@ -30,4 +30,4 @@ echo "For examples, see: "
|
||||
echo " "
|
||||
ls -1 examples/*/*sh
|
||||
echo " "
|
||||
echo "See also http://druid.io/docs/0.6.65"
|
||||
echo "See also http://druid.io/docs/0.6.72"
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -26,6 +26,11 @@ druid.service=broker
|
||||
druid.port=8080
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
# Change these to make Druid faster
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
|
||||
```
|
||||
|
||||
Production Configs
|
||||
@ -101,7 +106,7 @@ The broker module uses several of the default modules in [Configuration](Configu
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.broker.cache.sizeInBytes`|Maximum size of the cache. If this is zero, cache is disabled.|0|
|
||||
|`druid.broker.cache.sizeInBytes`|Maximum size of the cache. If this is zero, cache is disabled.|10485760 (10MB)|
|
||||
|`druid.broker.cache.initialSize`|The initial size of the cache in bytes.|500000|
|
||||
|`druid.broker.cache.logEvictionCount`|If this is non-zero, there will be an eviction of entries.|0|
|
||||
|
||||
|
@ -193,7 +193,7 @@ This module is required by nodes that can serve queries.
|
||||
|
||||
|Property|Description|Default|
|
||||
|--------|-----------|-------|
|
||||
|`druid.query.chunkPeriod`|Long interval queries may be broken into shorter interval queries.|P1M|
|
||||
|`druid.query.chunkPeriod`|Long interval queries may be broken into shorter interval queries.|0|
|
||||
|
||||
#### GroupBy Query Config
|
||||
|
||||
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
||||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.65
|
||||
git checkout druid-0.6.72
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
@ -66,7 +66,7 @@ druid.host=#{IP_ADDR}:8080
|
||||
druid.port=8080
|
||||
druid.service=druid/prod/indexer
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
@ -115,7 +115,7 @@ druid.host=#{IP_ADDR}:8080
|
||||
druid.port=8080
|
||||
druid.service=druid/prod/worker
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -27,7 +27,7 @@ druid.host=localhost
|
||||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
||||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -152,10 +152,7 @@ The indexing service can also run real-time tasks. These tasks effectively trans
|
||||
"intermediatePersistPeriod": "PT10m"
|
||||
},
|
||||
"windowPeriod": "PT10m",
|
||||
"segmentGranularity": "hour",
|
||||
"rejectionPolicy": {
|
||||
"type": "messageTime"
|
||||
}
|
||||
"segmentGranularity": "hour"
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
||||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
||||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.65
|
||||
cd druid-services-0.6.72
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
@ -160,13 +160,15 @@ You should be comfortable starting Druid nodes at this point. If not, it may be
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "none"
|
||||
"type": "test"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Note: This config uses a "test" rejection policy which will accept all events and timely hand off, however, we strongly recommend you do not use this in production. Using this rejection policy, segments for events for the same time range will be overridden.
|
||||
|
||||
3. Let's copy and paste some data into the Kafka console producer
|
||||
|
||||
```json
|
||||
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
||||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
@ -149,7 +149,7 @@ druid.port=8081
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
@ -240,7 +240,7 @@ druid.port=8083
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
@ -251,6 +251,9 @@ druid.publish.type=noop
|
||||
# druid.db.connector.password=diurd
|
||||
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
|
||||
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
|
||||
```
|
||||
|
||||
Next Steps
|
||||
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
||||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
||||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.65
|
||||
cd druid-services-0.6.72
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
||||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.65-bin.tar.gz.
|
||||
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.72-bin.tar.gz.
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
@ -53,7 +53,7 @@
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
|
||||
"rejectionPolicy": {
|
||||
"type": "messageTime"
|
||||
"type": "test"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,6 @@
|
||||
"windowPeriod" : "PT5m",
|
||||
"segmentGranularity":"hour",
|
||||
"basePersistDirectory" : "/tmp/realtime/basePersist",
|
||||
"rejectionPolicy": { "type": "messageTime" }
|
||||
"rejectionPolicy": { "type": "test" }
|
||||
}
|
||||
}]
|
||||
|
@ -2,4 +2,8 @@ druid.host=localhost
|
||||
druid.service=broker
|
||||
druid.port=8080
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
# Change these to make Druid faster
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
|
@ -4,7 +4,7 @@ druid.port=8081
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.72"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -4,7 +4,7 @@ druid.port=8083
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.65","io.druid.extensions:druid-kafka-seven:0.6.65","io.druid.extensions:druid-rabbitmq:0.6.65"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.72","io.druid.extensions:druid-kafka-seven:0.6.72","io.druid.extensions:druid-rabbitmq:0.6.72"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
@ -16,3 +16,5 @@ druid.publish.type=noop
|
||||
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
|
||||
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
21
pom.xml
21
pom.xml
@ -23,14 +23,14 @@
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
|
||||
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
|
||||
<url>http://www.github.com/metamx/druid</url>
|
||||
<tag>${project.artifactId}-${project.version}</tag>
|
||||
<tag>druid-0.6.72-SNAPSHOT</tag>
|
||||
</scm>
|
||||
|
||||
<prerequisites>
|
||||
@ -535,8 +535,21 @@
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>2.4</version>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-release-plugin</artifactId>
|
||||
<version>2.4.2</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.maven.scm</groupId>
|
||||
<artifactId>maven-scm-provider-gitexe</artifactId>
|
||||
<!-- This version is necessary for use with git version 1.8.5 and above -->
|
||||
<version>1.8.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<repositories>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -50,6 +50,10 @@ public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
|
||||
@Override
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
{
|
||||
if (period.getMillis() == 0) {
|
||||
return baseRunner.run(query);
|
||||
}
|
||||
|
||||
return Sequences.concat(
|
||||
FunctionalIterable
|
||||
.create(query.getIntervals())
|
||||
|
@ -69,6 +69,11 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
|
||||
public Sequence<T> run(final Query<T> query)
|
||||
{
|
||||
final ServiceMetricEvent.Builder builder = builderFn.apply(query);
|
||||
String queryId = query.getId();
|
||||
if (queryId == null) {
|
||||
queryId = "";
|
||||
}
|
||||
builder.setUser8(queryId);
|
||||
|
||||
return new Sequence<T>()
|
||||
{
|
||||
|
@ -27,7 +27,7 @@ import org.joda.time.Period;
|
||||
public class QueryConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private Period chunkPeriod = Period.months(1);
|
||||
private Period chunkPeriod = new Period();
|
||||
|
||||
public Period getChunkPeriod()
|
||||
{
|
||||
|
@ -39,13 +39,13 @@ public class ConstantPostAggregator implements PostAggregator
|
||||
@JsonCreator
|
||||
public ConstantPostAggregator(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("value") Number constantValue
|
||||
@JsonProperty("value") Number constantValue,
|
||||
@JsonProperty("constantValue") Number backwardsCompatibleValue
|
||||
)
|
||||
{
|
||||
// only value should be required for constants
|
||||
Preconditions.checkNotNull(constantValue, "Constant value must not be null");
|
||||
this.name = name;
|
||||
this.constantValue = constantValue;
|
||||
this.constantValue = constantValue == null ? backwardsCompatibleValue : constantValue;
|
||||
Preconditions.checkNotNull(this.constantValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -98,20 +98,26 @@ public class ConstantPostAggregator implements PostAggregator
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ConstantPostAggregator that = (ConstantPostAggregator) o;
|
||||
|
||||
if (constantValue != null && that.constantValue != null) {
|
||||
if (constantValue.doubleValue() != that.constantValue.doubleValue())
|
||||
if (constantValue.doubleValue() != that.constantValue.doubleValue()) {
|
||||
return false;
|
||||
}
|
||||
else if (constantValue != that.constantValue) {
|
||||
}
|
||||
} else if (constantValue != that.constantValue) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) return false;
|
||||
if (name != null ? !name.equals(that.name) : that.name != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -123,4 +129,5 @@ public class ConstantPostAggregator implements PostAggregator
|
||||
result = 31 * result + (constantValue != null ? constantValue.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -169,8 +169,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||
.setUser5(Joiner.on(",").join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString())
|
||||
.setUser10(query.getId());
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,8 +151,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
||||
.setUser4(query.getType())
|
||||
.setUser5(Joiner.on(",").join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString())
|
||||
.setUser10(query.getId());
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,8 +125,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
|
||||
.setUser4("search")
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString())
|
||||
.setUser10(query.getId());
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -127,8 +127,7 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
|
||||
.setUser4("Select")
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString())
|
||||
.setUser10(query.getId());
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,8 +119,7 @@ public class TimeBoundaryQueryQueryToolChest
|
||||
return new ServiceMetricEvent.Builder()
|
||||
.setUser2(query.getDataSource().toString())
|
||||
.setUser4(query.getType())
|
||||
.setUser6("false")
|
||||
.setUser10(query.getId());
|
||||
.setUser6("false");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,8 +128,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString())
|
||||
.setUser10(query.getId());
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,8 +133,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
||||
.setUser5(COMMA_JOIN.join(query.getIntervals()))
|
||||
.setUser6(String.valueOf(query.hasFilters()))
|
||||
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
|
||||
.setUser9(Minutes.minutes(numMinutes).toString())
|
||||
.setUser10(query.getId());
|
||||
.setUser9(Minutes.minutes(numMinutes).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,7 +119,7 @@ public class QueriesTest
|
||||
"+",
|
||||
Arrays.asList(
|
||||
new FieldAccessPostAggregator("idx", "idx"),
|
||||
new ConstantPostAggregator("const", 1)
|
||||
new ConstantPostAggregator("const", 1, null)
|
||||
)
|
||||
),
|
||||
new ArithmeticPostAggregator(
|
||||
@ -127,7 +127,7 @@ public class QueriesTest
|
||||
"-",
|
||||
Arrays.asList(
|
||||
new FieldAccessPostAggregator("rev", "rev"),
|
||||
new ConstantPostAggregator("const", 1)
|
||||
new ConstantPostAggregator("const", 1, null)
|
||||
)
|
||||
)
|
||||
)
|
||||
@ -173,7 +173,7 @@ public class QueriesTest
|
||||
"+",
|
||||
Arrays.asList(
|
||||
new FieldAccessPostAggregator("idx", "idx"),
|
||||
new ConstantPostAggregator("const", 1)
|
||||
new ConstantPostAggregator("const", 1, null)
|
||||
)
|
||||
),
|
||||
new ArithmeticPostAggregator(
|
||||
@ -181,7 +181,7 @@ public class QueriesTest
|
||||
"-",
|
||||
Arrays.asList(
|
||||
new FieldAccessPostAggregator("rev", "rev2"),
|
||||
new ConstantPostAggregator("const", 1)
|
||||
new ConstantPostAggregator("const", 1, null)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -67,7 +67,7 @@ public class QueryRunnerTestHelper
|
||||
"uniques",
|
||||
"quality_uniques"
|
||||
);
|
||||
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
|
||||
public static final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
|
||||
public static final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||
public static final ArithmeticPostAggregator addRowsIndexConstant =
|
||||
|
@ -48,7 +48,7 @@ public class ArithmeticPostAggregatorTest
|
||||
List<PostAggregator> postAggregatorList =
|
||||
Lists.newArrayList(
|
||||
new ConstantPostAggregator(
|
||||
"roku", 6
|
||||
"roku", 6, null
|
||||
),
|
||||
new FieldAccessPostAggregator(
|
||||
"rows", "rows"
|
||||
@ -79,7 +79,7 @@ public class ArithmeticPostAggregatorTest
|
||||
List<PostAggregator> postAggregatorList =
|
||||
Lists.newArrayList(
|
||||
new ConstantPostAggregator(
|
||||
"roku", 6
|
||||
"roku", 6, null
|
||||
),
|
||||
new FieldAccessPostAggregator(
|
||||
"rows", "rows"
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package io.druid.query.aggregation.post;
|
||||
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -33,11 +34,11 @@ public class ConstantPostAggregatorTest
|
||||
{
|
||||
ConstantPostAggregator constantPostAggregator;
|
||||
|
||||
constantPostAggregator = new ConstantPostAggregator("shichi", 7);
|
||||
constantPostAggregator = new ConstantPostAggregator("shichi", 7, null);
|
||||
Assert.assertEquals(7, constantPostAggregator.compute(null));
|
||||
constantPostAggregator = new ConstantPostAggregator("rei", 0.0);
|
||||
constantPostAggregator = new ConstantPostAggregator("rei", 0.0, null);
|
||||
Assert.assertEquals(0.0, constantPostAggregator.compute(null));
|
||||
constantPostAggregator = new ConstantPostAggregator("ichi", 1.0);
|
||||
constantPostAggregator = new ConstantPostAggregator("ichi", 1.0, null);
|
||||
Assert.assertNotSame(1, constantPostAggregator.compute(null));
|
||||
}
|
||||
|
||||
@ -45,10 +46,35 @@ public class ConstantPostAggregatorTest
|
||||
public void testComparator()
|
||||
{
|
||||
ConstantPostAggregator constantPostAggregator =
|
||||
new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1);
|
||||
new ConstantPostAggregator("thistestbasicallydoesnothing unhappyface", 1, null);
|
||||
Comparator comp = constantPostAggregator.getComparator();
|
||||
Assert.assertEquals(0, comp.compare(0, constantPostAggregator.compute(null)));
|
||||
Assert.assertEquals(0, comp.compare(0, 1));
|
||||
Assert.assertEquals(0, comp.compare(1, 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeBackwardsCompatible() throws Exception
|
||||
{
|
||||
|
||||
DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
ConstantPostAggregator aggregator = mapper.readValue(
|
||||
"{\"type\":\"constant\",\"name\":\"thistestbasicallydoesnothing unhappyface\",\"constantValue\":1}\n",
|
||||
ConstantPostAggregator.class
|
||||
);
|
||||
Assert.assertEquals(new Integer(1), aggregator.getConstantValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
|
||||
DefaultObjectMapper mapper = new DefaultObjectMapper();
|
||||
ConstantPostAggregator aggregator = new ConstantPostAggregator("aggregator", 2, null);
|
||||
ConstantPostAggregator aggregator1 = mapper.readValue(
|
||||
mapper.writeValueAsString(aggregator),
|
||||
ConstantPostAggregator.class
|
||||
);
|
||||
Assert.assertEquals(aggregator, aggregator1);
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class TimeseriesBinaryFnTest
|
||||
{
|
||||
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
|
||||
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
|
||||
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
|
||||
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||
final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator(
|
||||
|
@ -47,7 +47,7 @@ public class TopNBinaryFnTest
|
||||
{
|
||||
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
|
||||
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
|
||||
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
|
||||
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
|
||||
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
|
||||
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
|
||||
final ArithmeticPostAggregator addrowsindexconstant = new ArithmeticPostAggregator(
|
||||
|
Binary file not shown.
@ -76,7 +76,7 @@ came to the conclusion that there was nothing in the open source world that
|
||||
could be fully leveraged for our requirements.
|
||||
|
||||
We ended up creating Druid, an open-source, distributed, column-oriented,
|
||||
realtime analytical data store. In many ways, Druid shares similarities with
|
||||
real-time analytical data store. In many ways, Druid shares similarities with
|
||||
other OLAP systems \cite{oehler2012ibm, schrader2009oracle, lachev2005applied},
|
||||
interactive query systems \cite{melnik2010dremel}, main-memory databases
|
||||
\cite{farber2012sap}, and widely-known distributed data stores
|
||||
@ -122,7 +122,6 @@ edit.
|
||||
|
||||
\begin{table*}
|
||||
\centering
|
||||
\label{tab:sample_data}
|
||||
\begin{tabular}{| l | l | l | l | l | l | l | l |}
|
||||
\hline
|
||||
\textbf{Timestamp} & \textbf{Page} & \textbf{Username} & \textbf{Gender} & \textbf{City} & \textbf{Characters Added} & \textbf{Characters Removed} \\ \hline
|
||||
@ -132,6 +131,7 @@ edit.
|
||||
2011-01-01T02:00:00Z & Ke\$ha & Xeno & Male & Taiyuan & 3194 & 170 \\ \hline
|
||||
\end{tabular}
|
||||
\caption{Sample Druid data for edits that have occurred on Wikipedia.}
|
||||
\label{tab:sample_data}
|
||||
\end{table*}
|
||||
|
||||
Our goal is to rapidly compute drill-downs and aggregates over this data. We
|
||||
@ -160,7 +160,7 @@ determine business success or failure.
|
||||
|
||||
Finally, another key problem that Metamarkets faced in its early days was to
|
||||
allow users and alerting systems to be able to make business decisions in
|
||||
"real-time". The time from when an event is created to when that
|
||||
``real-time". The time from when an event is created to when that
|
||||
event is queryable determines how fast users and systems are able to react to
|
||||
potentially catastrophic occurrences in their systems. Popular open source data
|
||||
warehousing systems such as Hadoop were unable to provide the sub-second data ingestion
|
||||
@ -177,7 +177,7 @@ A Druid cluster consists of different types of nodes and each node type is
|
||||
designed to perform a specific set of things. We believe this design separates
|
||||
concerns and simplifies the complexity of the system. The different node types
|
||||
operate fairly independent of each other and there is minimal interaction
|
||||
between them. Hence, intra-cluster communication failures have minimal impact
|
||||
among them. Hence, intra-cluster communication failures have minimal impact
|
||||
on data availability. To solve complex data analysis problems, the different
|
||||
node types come together to form a fully working system. The name Druid comes
|
||||
from the Druid class in many role-playing games: it is a shape-shifter, capable
|
||||
@ -231,10 +231,10 @@ On a periodic basis, each real-time node will schedule a background task that
|
||||
searches for all locally persisted indexes. The task merges these indexes
|
||||
together and builds an immutable block of data that contains all the events
|
||||
that have ingested by a real-time node for some span of time. We refer to this
|
||||
block of data as a "segment". During the handoff stage, a real-time node
|
||||
block of data as a ``segment". During the handoff stage, a real-time node
|
||||
uploads this segment to a permanent backup storage, typically a distributed
|
||||
file system such as S3 \cite{decandia2007dynamo} or HDFS
|
||||
\cite{shvachko2010hadoop}, which Druid refers to as "deep storage". The ingest,
|
||||
\cite{shvachko2010hadoop}, which Druid refers to as ``deep storage". The ingest,
|
||||
persist, merge, and handoff steps are fluid; there is no data loss during any
|
||||
of the processes.
|
||||
|
||||
@ -260,7 +260,7 @@ collected for 13:00 to 14:00 and unannounces it is serving this data.
|
||||
\centering
|
||||
\includegraphics[width = 4.5in]{realtime_timeline}
|
||||
\caption{The node starts, ingests data, persists, and periodically hands data
|
||||
off. This process repeats indefinitely. The time intervals between different
|
||||
off. This process repeats indefinitely. The time periods between different
|
||||
real-time node operations are configurable.}
|
||||
\label{fig:realtime_timeline}
|
||||
\end{figure*}
|
||||
@ -413,7 +413,7 @@ distribution on historical nodes. The coordinator nodes tell historical nodes
|
||||
to load new data, drop outdated data, replicate data, and move data to load
|
||||
balance. Druid uses a multi-version concurrency control swapping protocol for
|
||||
managing immutable segments in order to maintain stable views. If any
|
||||
immutable segment contains data that is wholly obseleted by newer segments, the
|
||||
immutable segment contains data that is wholly obsoleted by newer segments, the
|
||||
outdated segment is dropped from the cluster. Coordinator nodes undergo a
|
||||
leader-election process that determines a single node that runs the coordinator
|
||||
functionality. The remaining coordinator nodes act as redundant backups.
|
||||
@ -436,8 +436,8 @@ Rules indicate how segments should be assigned to different historical node
|
||||
tiers and how many replicates of a segment should exist in each tier. Rules may
|
||||
also indicate when segments should be dropped entirely from the cluster. Rules
|
||||
are usually set for a period of time. For example, a user may use rules to
|
||||
load the most recent one month's worth of segments into a "hot" cluster, the
|
||||
most recent one year's worth of segments into a "cold" cluster, and drop any
|
||||
load the most recent one month's worth of segments into a ``hot" cluster, the
|
||||
most recent one year's worth of segments into a ``cold" cluster, and drop any
|
||||
segments that are older.
|
||||
|
||||
The coordinator nodes load a set of rules from a rule table in the MySQL
|
||||
@ -569,7 +569,7 @@ representations.
|
||||
\subsection{Indices for Filtering Data}
|
||||
In many real world OLAP workflows, queries are issued for the aggregated
|
||||
results of some set of metrics where some set of dimension specifications are
|
||||
met. An example query is: "How many Wikipedia edits were done by users in
|
||||
met. An example query is: ``How many Wikipedia edits were done by users in
|
||||
San Francisco who are also male?". This query is filtering the Wikipedia data
|
||||
set in Table~\ref{tab:sample_data} based on a Boolean expression of dimension
|
||||
values. In many real world data sets, dimension columns contain strings and
|
||||
@ -609,12 +609,11 @@ used in search engines. Bitmap indices for OLAP workloads is described in
|
||||
detail in \cite{o1997improved}. Bitmap compression algorithms are a
|
||||
well-defined area of research \cite{antoshenkov1995byte, wu2006optimizing,
|
||||
van2011memory} and often utilize run-length encoding. Druid opted to use the
|
||||
Concise algorithm \cite{colantonio2010concise} as it can outperform WAH by
|
||||
reducing compressed bitmap size by up to 50\%. Figure~\ref{fig:concise_plot}
|
||||
Concise algorithm \cite{colantonio2010concise}. Figure~\ref{fig:concise_plot}
|
||||
illustrates the number of bytes using Concise compression versus using an
|
||||
integer array. The results were generated on a \texttt{cc2.8xlarge} system with a single
|
||||
thread, 2G heap, 512m young gen, and a forced GC between each run. The data set
|
||||
is a single day’s worth of data collected from the Twitter garden hose
|
||||
integer array. The results were generated on a \texttt{cc2.8xlarge} system with
|
||||
a single thread, 2G heap, 512m young gen, and a forced GC between each run. The
|
||||
data set is a single day’s worth of data collected from the Twitter garden hose
|
||||
\cite{twitter2013} data stream. The data set contains 2,272,295 rows and 12
|
||||
dimensions of varying cardinality. As an additional comparison, we also
|
||||
resorted the data set rows to maximize compression.
|
||||
@ -679,9 +678,9 @@ A sample count query over a week of data is as follows:
|
||||
"aggregations" : [{"type":"count", "name":"rows"}]
|
||||
}
|
||||
\end{verbatim}}
|
||||
The query shown above will return a count of the number of rows in the Wikipedia datasource
|
||||
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the "page" dimension is
|
||||
equal to "Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
|
||||
The query shown above will return a count of the number of rows in the Wikipedia data source
|
||||
from 2013-01-01 to 2013-01-08, filtered for only those rows where the value of the ``page" dimension is
|
||||
equal to ``Ke\$ha". The results will be bucketed by day and will be a JSON array of the following form:
|
||||
{\scriptsize\begin{verbatim}
|
||||
[ {
|
||||
"timestamp": "2012-01-01T00:00:00.000Z",
|
||||
@ -706,7 +705,7 @@ of this paper to fully describe the query API but more information can be found
|
||||
online\footnote{\href{http://druid.io/docs/latest/Querying.html}{http://druid.io/docs/latest/Querying.html}}.
|
||||
|
||||
As of this writing, a join query for Druid is not yet implemented. This has
|
||||
been a function of engineering resource allocation decisions and use case more
|
||||
been a function of engineering resource allocation and use case decisions more
|
||||
than a decision driven by technical merit. Indeed, Druid's storage format
|
||||
would allow for the implementation of joins (there is no loss of fidelity for
|
||||
columns included as dimensions) and the implementation of them has been a
|
||||
@ -724,7 +723,7 @@ a shared set of keys. The primary high-level strategies for join queries the
|
||||
authors are aware of are a hash-based strategy or a sorted-merge strategy. The
|
||||
hash-based strategy requires that all but one data set be available as
|
||||
something that looks like a hash table, a lookup operation is then performed on
|
||||
this hash table for every row in the "primary" stream. The sorted-merge
|
||||
this hash table for every row in the ``primary" stream. The sorted-merge
|
||||
strategy assumes that each stream is sorted by the join key and thus allows for
|
||||
the incremental joining of the streams. Each of these strategies, however,
|
||||
requires the materialization of some number of the streams either in sorted
|
||||
@ -751,8 +750,7 @@ Druid query performance can vary signficantly depending on the query
|
||||
being issued. For example, sorting the values of a high cardinality dimension
|
||||
based on a given metric is much more expensive than a simple count over a time
|
||||
range. To showcase the average query latencies in a production Druid cluster,
|
||||
we selected 8 of our most queried data sources, described in
|
||||
Table~\ref{tab:datasources}.
|
||||
we selected 8 of our most queried data sources, described in Table~\ref{tab:datasources}.
|
||||
|
||||
Approximately 30\% of the queries are standard
|
||||
aggregates involving different types of metrics and filters, 60\% of queries
|
||||
@ -764,7 +762,6 @@ involving all columns are very rare.
|
||||
|
||||
\begin{table}
|
||||
\centering
|
||||
\label{tab:datasources}
|
||||
\begin{tabular}{| l | l | l |}
|
||||
\hline
|
||||
\textbf{Data Source} & \textbf{Dimensions} & \textbf{Metrics} \\ \hline
|
||||
@ -778,15 +775,17 @@ involving all columns are very rare.
|
||||
\texttt{h} & 78 & 14 \\ \hline
|
||||
\end{tabular}
|
||||
\caption{Characteristics of production data sources.}
|
||||
\label{tab:datasources}
|
||||
\end{table}
|
||||
|
||||
A few notes about our results:
|
||||
\begin{itemize}[leftmargin=*,beginpenalty=5000,topsep=0pt]
|
||||
\item The results are from a "hot" tier in our production cluster. We run
|
||||
several tiers of varying performance in production.
|
||||
\item The results are from a ``hot" tier in our production cluster. There were
|
||||
approximately 50 data sources in the tier and several hundred users issuing
|
||||
queries.
|
||||
|
||||
\item There is approximately 10.5TB of RAM available in the "hot" tier and
|
||||
approximately 10TB of segments loaded (including replication). Collectively,
|
||||
\item There was approximately 10.5TB of RAM available in the ``hot" tier and
|
||||
approximately 10TB of segments loaded. Collectively,
|
||||
there are about 50 billion Druid rows in this tier. Results for
|
||||
every data source are not shown.
|
||||
|
||||
@ -798,13 +797,13 @@ threads and 672 total cores (hyperthreaded).
|
||||
\end{itemize}
|
||||
|
||||
Query latencies are shown in Figure~\ref{fig:query_latency} and the queries per
|
||||
minute is shown in Figure~\ref{fig:queries_per_min}. Across all the various
|
||||
minute are shown in Figure~\ref{fig:queries_per_min}. Across all the various
|
||||
data sources, average query latency is approximately 550 milliseconds, with
|
||||
90\% of queries returning in less than 1 second, 95\% in under 2 seconds, and
|
||||
99\% of queries taking less than 10 seconds to complete.
|
||||
Occasionally we observe spikes in latency, as observed on February 19,
|
||||
in which case network issues on the cache nodes were compounded by very high
|
||||
query load on one of our largest datasources.
|
||||
99\% of queries returning in less than 10 seconds. Occasionally we observe
|
||||
spikes in latency, as observed on February 19, in which case network issues on
|
||||
the Memcached instances were compounded by very high query load on one of our
|
||||
largest datasources.
|
||||
|
||||
\begin{figure}
|
||||
\centering
|
||||
@ -883,7 +882,7 @@ ingestion setup consists of 6 nodes, totalling 360GB of RAM and 96 cores
|
||||
(12 x Intel Xeon E5-2670).
|
||||
|
||||
Note that in this setup, several other data sources were being ingested and
|
||||
many other Druid related ingestion tasks were running concurrently on those machines.
|
||||
many other Druid related ingestion tasks were running concurrently on the machines.
|
||||
|
||||
Druid's data ingestion latency is heavily dependent on the complexity of the
|
||||
data set being ingested. The data complexity is determined by the number of
|
||||
@ -893,11 +892,10 @@ aggregations we want to perform on those metrics. With the most basic data set
|
||||
800,000 events/second/core, which is really just a measurement of how fast we can
|
||||
deserialize events. Real world data sets are never this simple.
|
||||
Table~\ref{tab:ingest_datasources} shows a selection of data sources and their
|
||||
chracteristics.
|
||||
characteristics.
|
||||
|
||||
\begin{table}
|
||||
\centering
|
||||
\label{tab:ingest_datasources}
|
||||
\begin{tabular}{| l | l | l | l |}
|
||||
\hline
|
||||
\scriptsize\textbf{Data Source} & \scriptsize\textbf{Dimensions} & \scriptsize\textbf{Metrics} & \scriptsize\textbf{Peak events/s} \\ \hline
|
||||
@ -911,6 +909,7 @@ chracteristics.
|
||||
\texttt{z} & 33 & 24 & 95747.74 \\ \hline
|
||||
\end{tabular}
|
||||
\caption{Ingestion characteristics of various data sources.}
|
||||
\label{tab:ingest_datasources}
|
||||
\end{table}
|
||||
|
||||
We can see that, based on the descriptions in
|
||||
@ -938,7 +937,7 @@ The latency measurements we presented are sufficient to address the our stated
|
||||
problems of interactivity. We would prefer the variability in the latencies to
|
||||
be less. It is still very possible to possible to decrease latencies by adding
|
||||
additional hardware, but we have not chosen to do so because infrastructure
|
||||
cost is still a consideration to us.
|
||||
costs are still a consideration to us.
|
||||
|
||||
\section{Druid in Production}\label{sec:production}
|
||||
Over the last few years, we have gained tremendous knowledge about handling
|
||||
@ -950,19 +949,19 @@ explore use case, the number of queries issued by a single user is much higher
|
||||
than in the reporting use case. Exploratory queries often involve progressively
|
||||
adding filters for the same time range to narrow down results. Users tend to
|
||||
explore short time intervals of recent data. In the generate report use case,
|
||||
users query for much longer data intervals, but users also already have the
|
||||
queries they want to issue in mind.
|
||||
users query for much longer data intervals, but users also already know the
|
||||
queries they want to issue.
|
||||
|
||||
\paragraph{Multitenancy}
|
||||
Expensive concurrent queries can be problematic in a multitenant
|
||||
environment. Queries for large datasources may end up hitting every historical
|
||||
environment. Queries for large data sources may end up hitting every historical
|
||||
node in a cluster and consume all cluster resources. Smaller, cheaper queries
|
||||
may be blocked from executing in such cases. We introduced query prioritization
|
||||
to address these issues. Each historical node is able to prioritize which
|
||||
segments it needs to scan. Proper query planning is critical for production
|
||||
workloads. Thankfully, queries for a significant amount of data tend to be for
|
||||
reporting use cases, and users are not expecting the same level of
|
||||
interactivity as when they are querying to explore data.
|
||||
reporting use cases and can be deprioritized. Users do not expect the same level of
|
||||
interactivity in this use case as when they are exploring data.
|
||||
|
||||
\paragraph{Node failures}
|
||||
Single node failures are common in distributed environments, but many nodes
|
||||
@ -976,12 +975,12 @@ historical nodes.
|
||||
|
||||
\paragraph{Data Center Outages}
|
||||
Complete cluster failures are possible, but extremely rare. If Druid is
|
||||
deployed only in a single data center, it is possible for the entire data
|
||||
only deployed in a single data center, it is possible for the entire data
|
||||
center to fail. In such cases, new machines need to be provisioned. As long as
|
||||
deep storage is still available, cluster recovery time is network bound as
|
||||
historical nodes simply need to redownload every segment from deep storage. We
|
||||
have experienced such failures in the past, and the recovery time was around
|
||||
several hours in the AWS ecosystem on several TBs of data.
|
||||
several hours in the AWS ecosystem for several TBs of data.
|
||||
|
||||
\subsection{Operational Monitoring}
|
||||
Proper monitoring is critical to run a large scale distributed cluster.
|
||||
@ -1076,7 +1075,7 @@ stores \cite{macnicol2004sybase}.
|
||||
In this paper, we presented Druid, a distributed, column-oriented, real-time
|
||||
analytical data store. Druid is designed to power high performance applications
|
||||
and is optimized for low query latencies. Druid supports streaming data
|
||||
ingestion and is fault-tolerant. We discussed how Druid benchmarks and
|
||||
ingestion and is fault-tolerant. We discussed Druid benchmarks and
|
||||
summarized key architecture aspects such
|
||||
as the storage format, query language, and general execution.
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -28,7 +28,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -212,12 +212,12 @@ unaryExpression returns [PostAggregator p]
|
||||
if($e.p instanceof ConstantPostAggregator) {
|
||||
ConstantPostAggregator c = (ConstantPostAggregator)$e.p;
|
||||
double v = c.getConstantValue().doubleValue() * -1;
|
||||
$p = new ConstantPostAggregator(Double.toString(v), v);
|
||||
$p = new ConstantPostAggregator(Double.toString(v), v, null);
|
||||
} else {
|
||||
$p = new ArithmeticPostAggregator(
|
||||
"-"+$e.p.getName(),
|
||||
"*",
|
||||
Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0))
|
||||
Lists.newArrayList($e.p, new ConstantPostAggregator("-1", -1.0, null))
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -240,7 +240,7 @@ aggregate returns [AggregatorFactory agg]
|
||||
;
|
||||
|
||||
constant returns [ConstantPostAggregator c]
|
||||
: value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v); }
|
||||
: value=NUMBER { double v = Double.parseDouble($value.text); $c = new ConstantPostAggregator(Double.toString(v), v, null); }
|
||||
;
|
||||
|
||||
/* time filters must be top level filters */
|
||||
|
@ -29,7 +29,7 @@ public class LocalCacheProvider implements CacheProvider
|
||||
{
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
private long sizeInBytes = 0;
|
||||
private long sizeInBytes = 10485760;
|
||||
|
||||
@JsonProperty
|
||||
@Min(0)
|
||||
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.segment.realtime.plumber;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class CustomVersioningPolicy implements VersioningPolicy
|
||||
{
|
||||
private final String version;
|
||||
|
||||
@JsonCreator
|
||||
public CustomVersioningPolicy(
|
||||
@JsonProperty("version") String version
|
||||
)
|
||||
{
|
||||
this.version = version == null ? new DateTime().toString() : version;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getVersion(Interval interval)
|
||||
{
|
||||
return version;
|
||||
}
|
||||
}
|
@ -1,3 +1,22 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.segment.realtime.plumber;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -578,9 +578,12 @@ public class RealtimePlumber implements Plumber
|
||||
|
||||
log.info("Starting merge and push.");
|
||||
|
||||
long minTimestamp = segmentGranularity.truncate(
|
||||
DateTime minTimestampAsDate = segmentGranularity.truncate(
|
||||
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
|
||||
).getMillis();
|
||||
);
|
||||
long minTimestamp = minTimestampAsDate.getMillis();
|
||||
|
||||
log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate);
|
||||
|
||||
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
|
||||
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
|
||||
@ -588,9 +591,13 @@ public class RealtimePlumber implements Plumber
|
||||
if (intervalStart < minTimestamp) {
|
||||
log.info("Adding entry[%s] for merge and push.", entry);
|
||||
sinksToPush.add(entry);
|
||||
} else {
|
||||
log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Found [%,d] sinks to persist and merge", sinksToPush.size());
|
||||
|
||||
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
|
||||
persistAndMerge(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
@ -27,7 +27,8 @@ import org.joda.time.Period;
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
|
||||
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class),
|
||||
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class)
|
||||
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class),
|
||||
@JsonSubTypes.Type(name = "test", value = TestRejectionPolicyFactory.class)
|
||||
})
|
||||
public interface RejectionPolicyFactory
|
||||
{
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.segment.realtime.plumber;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Period;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TestRejectionPolicyFactory implements RejectionPolicyFactory
|
||||
{
|
||||
@Override
|
||||
public RejectionPolicy create(Period windowPeriod)
|
||||
{
|
||||
return new RejectionPolicy()
|
||||
{
|
||||
private final DateTime max = new DateTime(Long.MAX_VALUE);
|
||||
|
||||
@Override
|
||||
public DateTime getCurrMaxTime()
|
||||
{
|
||||
return max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean accept(long timestamp)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -25,7 +25,9 @@ import org.joda.time.Interval;
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class)
|
||||
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class),
|
||||
@JsonSubTypes.Type(name = "custom", value = CustomVersioningPolicy.class)
|
||||
|
||||
})
|
||||
public interface VersioningPolicy
|
||||
{
|
||||
|
@ -35,7 +35,9 @@ import org.joda.time.DateTime;
|
||||
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
|
||||
@JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class),
|
||||
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
|
||||
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class)
|
||||
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
|
||||
@JsonSubTypes.Type(name = "dropForever", value = ForeverDropRule.class)
|
||||
|
||||
})
|
||||
public interface Rule
|
||||
{
|
||||
|
@ -126,7 +126,7 @@ public class DatasourcesResource
|
||||
|
||||
@GET
|
||||
@Path("/{dataSourceName}")
|
||||
@Consumes("application/json")
|
||||
@Produces("application/json")
|
||||
public Response getTheDataSource(
|
||||
@PathParam("dataSourceName") final String dataSourceName
|
||||
)
|
||||
|
@ -461,6 +461,22 @@ public class InfoResource
|
||||
).build();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/datasources/{dataSourceName}")
|
||||
@Produces("application/json")
|
||||
public Response getTheDataSource(
|
||||
@PathParam("dataSourceName") final String dataSourceName
|
||||
)
|
||||
{
|
||||
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
|
||||
if (dataSource == null) {
|
||||
return Response.status(Response.Status.NOT_FOUND).build();
|
||||
}
|
||||
|
||||
return Response.ok(dataSource).build();
|
||||
}
|
||||
|
||||
|
||||
@DELETE
|
||||
@Path("/datasources/{dataSourceName}")
|
||||
public Response deleteDataSource(
|
||||
|
@ -27,7 +27,7 @@
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.66-SNAPSHOT</version>
|
||||
<version>0.6.73-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -54,7 +54,7 @@ import java.util.List;
|
||||
*/
|
||||
@Command(
|
||||
name = "broker",
|
||||
description = "Runs a broker node, see http://druid.io/docs/0.6.65/Broker.html for a description"
|
||||
description = "Runs a broker node, see http://druid.io/docs/0.6.72/Broker.html for a description"
|
||||
)
|
||||
public class CliBroker extends ServerRunnable
|
||||
{
|
||||
|
@ -66,7 +66,7 @@ import java.util.List;
|
||||
*/
|
||||
@Command(
|
||||
name = "coordinator",
|
||||
description = "Runs the Coordinator, see http://druid.io/docs/0.6.65/Coordinator.html for a description."
|
||||
description = "Runs the Coordinator, see http://druid.io/docs/0.6.72/Coordinator.html for a description."
|
||||
)
|
||||
public class CliCoordinator extends ServerRunnable
|
||||
{
|
||||
|
@ -41,7 +41,7 @@ import java.util.List;
|
||||
*/
|
||||
@Command(
|
||||
name = "hadoop",
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.65/Batch-ingestion.html for a description."
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.72/Batch-ingestion.html for a description."
|
||||
)
|
||||
public class CliHadoopIndexer implements Runnable
|
||||
{
|
||||
|
@ -42,7 +42,7 @@ import java.util.List;
|
||||
*/
|
||||
@Command(
|
||||
name = "historical",
|
||||
description = "Runs a Historical node, see http://druid.io/docs/0.6.65/Historical.html for a description"
|
||||
description = "Runs a Historical node, see http://druid.io/docs/0.6.72/Historical.html for a description"
|
||||
)
|
||||
public class CliHistorical extends ServerRunnable
|
||||
{
|
||||
|
@ -93,7 +93,7 @@ import java.util.List;
|
||||
*/
|
||||
@Command(
|
||||
name = "overlord",
|
||||
description = "Runs an Overlord node, see http://druid.io/docs/0.6.65/Indexing-Service.html for a description"
|
||||
description = "Runs an Overlord node, see http://druid.io/docs/0.6.72/Indexing-Service.html for a description"
|
||||
)
|
||||
public class CliOverlord extends ServerRunnable
|
||||
{
|
||||
|
@ -30,7 +30,7 @@ import java.util.List;
|
||||
*/
|
||||
@Command(
|
||||
name = "realtime",
|
||||
description = "Runs a realtime node, see http://druid.io/docs/0.6.65/Realtime.html for a description"
|
||||
description = "Runs a realtime node, see http://druid.io/docs/0.6.72/Realtime.html for a description"
|
||||
)
|
||||
public class CliRealtime extends ServerRunnable
|
||||
{
|
||||
|
@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
|
||||
*/
|
||||
@Command(
|
||||
name = "realtime",
|
||||
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.65/Realtime.html for a description"
|
||||
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.72/Realtime.html for a description"
|
||||
)
|
||||
public class CliRealtimeExample extends ServerRunnable
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user