diff --git a/README.md b/README.md
index e2a7a230285..2a5f1efae96 100644
--- a/README.md
+++ b/README.md
@@ -17,4 +17,6 @@ We host documentation on our [website](http://druid.io/docs/latest/). If you wan
We have a series of tutorials to get started with Druid, starting with this [one](http://druid.io/docs/latest/Tutorial:-A-First-Look-at-Druid.html).
### Support
-Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in #druid-dev on irc.freenode.net.
+Report any bugs using [GitHub issues](https://github.com/metamx/druid/issues).
+
+Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in `#druid-dev` on `irc.freenode.net`.
diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index cf28bf6f200..e8ebb570a86 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index ebdb531c35b..0b57dcf484e 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/docs/content/Examples.md b/docs/content/Examples.md
index 3e915926b1b..2d0759abc57 100644
--- a/docs/content/Examples.md
+++ b/docs/content/Examples.md
@@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
-git checkout druid-0.6.138
+git checkout druid-0.6.139
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
-[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz) a stand-alone tarball and run it:
+[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz
diff --git a/docs/content/Kafka-Eight.md b/docs/content/Kafka-Eight.md
index 129081a12e9..240bf31fefe 100644
--- a/docs/content/Kafka-Eight.md
+++ b/docs/content/Kafka-Eight.md
@@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.138",...]`
+ - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]`
- becomes
- - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.138",...]`
+ - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
diff --git a/docs/content/Production-Cluster-Configuration.md b/docs/content/Production-Cluster-Configuration.md
index a79ee275c77..2b2328b2d95 100644
--- a/docs/content/Production-Cluster-Configuration.md
+++ b/docs/content/Production-Cluster-Configuration.md
@@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Realtime-Config.md b/docs/content/Realtime-Config.md
index 91fc4696a88..80d67c3ffad 100644
--- a/docs/content/Realtime-Config.md
+++ b/docs/content/Realtime-Config.md
@@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.zk.service.host=localhost
@@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
diff --git a/docs/content/Simple-Cluster-Configuration.md b/docs/content/Simple-Cluster-Configuration.md
index 0742bb80689..820fb0e9d6b 100644
--- a/docs/content/Simple-Cluster-Configuration.md
+++ b/docs/content/Simple-Cluster-Configuration.md
@@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
--Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
+-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid
diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md
index 1ef439fc83f..a60942f8544 100644
--- a/docs/content/Tutorial:-A-First-Look-at-Druid.md
+++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md
@@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz). Download this file to a directory of your choosing.
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.138
+cd druid-services-0.6.139
```
You should see a bunch of files:
diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
index a0e248e04b4..9ef08aedda3 100644
--- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
+++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md
@@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md
index bb8fdf15dd7..095c175a719 100644
--- a/docs/content/Tutorial:-The-Druid-Cluster.md
+++ b/docs/content/Tutorial:-The-Druid-Cluster.md
@@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
-You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz)
+You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
and untar the contents within by issuing:
@@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md
index 6c8587a0368..2a23fde861f 100644
--- a/docs/content/Tutorial:-Webstream.md
+++ b/docs/content/Tutorial:-Webstream.md
@@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz)
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
-cd druid-services-0.6.138
+cd druid-services-0.6.139
```
You should see a bunch of files:
diff --git a/docs/content/Twitter-Tutorial.md b/docs/content/Twitter-Tutorial.md
index d9d8a9961b7..9f8c80ad8ad 100644
--- a/docs/content/Twitter-Tutorial.md
+++ b/docs/content/Twitter-Tutorial.md
@@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
-We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz).
+We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:
diff --git a/examples/config/historical/runtime.properties b/examples/config/historical/runtime.properties
index beeb55a5d1f..22073a9a444 100644
--- a/examples/config/historical/runtime.properties
+++ b/examples/config/historical/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
diff --git a/examples/config/overlord/runtime.properties b/examples/config/overlord/runtime.properties
index fdc28e9d72c..feb006eaba6 100644
--- a/examples/config/overlord/runtime.properties
+++ b/examples/config/overlord/runtime.properties
@@ -4,7 +4,7 @@ druid.service=overlord
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties
index 21040399551..93bd99bf749 100644
--- a/examples/config/realtime/runtime.properties
+++ b/examples/config/realtime/runtime.properties
@@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
-druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138","io.druid.extensions:druid-rabbitmq:0.6.138"]
+druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139","io.druid.extensions:druid-rabbitmq:0.6.139"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop
diff --git a/examples/pom.xml b/examples/pom.xml
index c6efb88b0b7..321e4a226e8 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml
index 7a75386914e..1fe657ba77a 100644
--- a/hdfs-storage/pom.xml
+++ b/hdfs-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/histogram/pom.xml b/histogram/pom.xml
index 0fea0bcd982..24e9937621a 100644
--- a/histogram/pom.xml
+++ b/histogram/pom.xml
@@ -27,7 +27,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 40b655af3f3..22fb248bf9b 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index be252b1a25e..09368b7c105 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 57b6c0ddaa1..37055fb4af9 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -115,7 +115,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
- @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
+ @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
+ @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
@@ -142,7 +143,7 @@ public class RealtimeIndexTask extends AbstractTask
windowPeriod,
null,
null,
- rejectionPolicyFactory,
+ rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
spec.getShardSpec()
),
@@ -315,6 +316,7 @@ public class RealtimeIndexTask extends AbstractTask
null,
null,
null,
+ null,
0
);
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
index 6220f447bde..5280a884a55 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java
@@ -54,6 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
1,
null,
+ null,
null
);
this.status = status;
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index 516ea984301..fc7f13ef3ad 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -207,6 +207,7 @@ public class TaskSerdeTest
new Period("PT10M"),
1,
Granularity.HOUR,
+ null,
null
);
diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java
index 7aafad53e23..ec593aeee33 100644
--- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java
@@ -49,6 +49,7 @@ public class TaskAnnouncementTest
new Period("PT10M"),
1,
Granularity.HOUR,
+ null,
null
);
final TaskStatus status = TaskStatus.running(task.getId());
diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml
index 18ceac9ffcf..7c09c218cc7 100644
--- a/kafka-eight/pom.xml
+++ b/kafka-eight/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml
index c045bbd4a78..cbd3a85f0e0 100644
--- a/kafka-seven/pom.xml
+++ b/kafka-seven/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index afd2d0ba0b4..31bd84a4f63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
io.druid
druid
pom
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
druid
druid
diff --git a/processing/pom.xml b/processing/pom.xml
index aab8f6851ed..a92d70564ee 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java
index bf0432ad5c9..efb91a9c3c0 100644
--- a/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java
+++ b/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java
@@ -185,7 +185,7 @@ public class OrderByColumnSpec
final byte[] dimensionBytes = dimension.getBytes();
final byte[] directionBytes = direction.name().getBytes();
- return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length)
+ return ByteBuffer.allocate(dimensionBytes.length + directionBytes.length)
.put(dimensionBytes)
.put(directionBytes)
.array();
diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml
index 942474943c4..fd953df211a 100644
--- a/rabbitmq/pom.xml
+++ b/rabbitmq/pom.xml
@@ -9,7 +9,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/s3-extensions/pom.xml b/s3-extensions/pom.xml
index 38522266519..586529ab49f 100644
--- a/s3-extensions/pom.xml
+++ b/s3-extensions/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/server/pom.xml b/server/pom.xml
index 0e0ef373f3e..d752f9a1811 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
index 611d16b476f..a90fe44d352 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java
@@ -65,7 +65,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
- @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
+ @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
+ @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
@@ -81,6 +82,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
basePersistDirectory,
segmentGranularity,
versioningPolicy,
+ rejectionPolicy,
rejectionPolicyFactory,
maxPendingPersists
);
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
index fb28f6c8ae1..bf1b575ff41 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber
return;
}
- File mergedFile = null;
try {
List indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
@@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber
indexes.add(queryableIndex);
}
- mergedFile = IndexMerger.mergeQueryableIndex(
+ final File mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
@@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber
segmentPublisher.publishSegment(segment);
}
- catch (IOException e) {
+ catch (Exception e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
if (shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
+ // This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
abandonSegment(truncatedTime, sink);
- }
- }
-
- if (mergedFile != null) {
- try {
- log.info("Deleting Index File[%s]", mergedFile);
- FileUtils.deleteDirectory(mergedFile);
- }
- catch (IOException e) {
- log.warn(e, "Error deleting directory[%s]", mergedFile);
+ } else {
+ // Delete any possibly-partially-written files, so we can try again on the next push cycle.
+ removeMergedSegment(sink);
}
}
}
@@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber
}
/**
- * Unannounces a given sink and removes all local references to it.
+ * Unannounces a given sink and removes all local references to it. It is important that this is only called
+ * from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while
+ * being created.
*/
protected void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
- FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
+ removeMergedSegment(sink);
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
@@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber
handoffCondition.notifyAll();
}
}
- catch (IOException e) {
+ catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
@@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber
}
);
}
+
+ private void removeMergedSegment(final Sink sink)
+ {
+ final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
+ if (mergedTarget.exists()) {
+ try {
+ log.info("Deleting Index File[%s]", mergedTarget);
+ FileUtils.deleteDirectory(mergedTarget);
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
+ .addData("interval", sink.getInterval())
+ .emit();
+ }
+ }
+ }
}
diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
index 43020d7cc53..b3b602b8212 100644
--- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
+++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java
@@ -75,7 +75,8 @@ public class RealtimePlumberSchool implements PlumberSchool
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
- @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
+ @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
+ @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
@@ -90,7 +91,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = versioningPolicy;
- this.rejectionPolicyFactory = rejectionPolicyFactory;
+ this.rejectionPolicyFactory = (rejectionPolicy == null) ? rejectionPolicyFactory : rejectionPolicy;
this.maxPendingPersists = maxPendingPersists;
}
diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java
index c7ee45ce492..da0fd0066a3 100644
--- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java
+++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java
@@ -45,6 +45,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,6 +74,7 @@ public class LoadQueuePeon
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor;
+ private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
@@ -94,12 +96,14 @@ public class LoadQueuePeon
String basePath,
ObjectMapper jsonMapper,
ScheduledExecutorService zkWritingExecutor,
+ ExecutorService callbackExecutor,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.basePath = basePath;
this.jsonMapper = jsonMapper;
+ this.callBackExecutor = callbackExecutor;
this.zkWritingExecutor = zkWritingExecutor;
this.config = config;
}
@@ -333,8 +337,18 @@ public class LoadQueuePeon
default:
throw new UnsupportedOperationException();
}
- currentlyProcessing.executeCallbacks();
- currentlyProcessing = null;
+
+ callBackExecutor.execute(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ currentlyProcessing.executeCallbacks();
+ currentlyProcessing = null;
+ }
+ }
+ );
}
}
diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java
index 74841853589..d6cb2a6ba0d 100644
--- a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java
+++ b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
@@ -33,6 +34,7 @@ public class LoadQueueTaskMaster
private final CuratorFramework curator;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec;
+ private final ExecutorService callbackExec;
private final DruidCoordinatorConfig config;
@Inject
@@ -40,17 +42,19 @@ public class LoadQueueTaskMaster
CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorService peonExec,
+ ExecutorService callbackExec,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.jsonMapper = jsonMapper;
this.peonExec = peonExec;
+ this.callbackExec = callbackExec;
this.config = config;
}
public LoadQueuePeon giveMePeon(String basePath)
{
- return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
+ return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config);
}
}
diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
index 4fbcd78a602..d911aef7d35 100644
--- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
+++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
@@ -19,6 +19,8 @@
package io.druid.server.router;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
@@ -27,12 +29,32 @@ import io.druid.query.Query;
*/
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
+ private final int minPriority;
+ private final int maxPriority;
+
+ @JsonCreator
+ public PriorityTieredBrokerSelectorStrategy(
+ @JsonProperty("minPriority") Integer minPriority,
+ @JsonProperty("maxPriority") Integer maxPriority
+ )
+ {
+ this.minPriority = minPriority == null ? 0 : minPriority;
+ this.maxPriority = maxPriority == null ? 1 : maxPriority;
+ }
+
@Override
public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = query.getContextPriority(0);
- if (priority < 0) {
+ if (priority < minPriority) {
+ return Optional.of(
+ Iterables.getLast(
+ tierConfig.getTierToBrokerMap().values(),
+ tierConfig.getDefaultBrokerServiceName()
+ )
+ );
+ } else if (priority >= maxPriority) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
index 395dd81e2e6..11e8ac67181 100644
--- a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
+++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java
@@ -20,12 +20,15 @@
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import org.joda.time.Period;
import javax.validation.constraints.NotNull;
+import java.util.Arrays;
import java.util.LinkedHashMap;
+import java.util.List;
/**
*/
@@ -56,7 +59,10 @@ public class TieredBrokerConfig
@JsonProperty
@NotNull
- private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]";
+ private List strategies = Arrays.asList(
+ new TimeBoundaryTieredBrokerSelectorStrategy(),
+ new PriorityTieredBrokerSelectorStrategy(0, 1)
+ );
// tier,
public LinkedHashMap getTierToBrokerMap()
@@ -93,8 +99,8 @@ public class TieredBrokerConfig
return pollPeriod;
}
- public String getStrategies()
+ public List getStrategies()
{
- return strategies;
+ return ImmutableList.copyOf(strategies);
}
}
diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
index 3300261b7d4..268ae31407b 100644
--- a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
+++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
@@ -19,10 +19,6 @@
package io.druid.server.router;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.client.util.Lists;
-import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -32,23 +28,12 @@ import java.util.List;
*/
public class TieredBrokerSelectorStrategiesProvider implements Provider>
{
- private final List strategies = Lists.newArrayList();
+ private final List strategies;
@Inject
- public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config)
+ public TieredBrokerSelectorStrategiesProvider(TieredBrokerConfig config)
{
- try {
- this.strategies.addAll(
- (List) jsonMapper.readValue(
- config.getStrategies(), new TypeReference>()
- {
- }
- )
- );
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
+ this.strategies = config.getStrategies();
}
@Override
diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
index 19c104a4ff0..0113ee3dbc1 100644
--- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java
@@ -73,7 +73,7 @@ public class FireDepartmentTest
new RealtimeIOConfig(
null,
new RealtimePlumberSchool(
- null, null, null, null, null, null, null, null, null, null, null, null, 0
+ null, null, null, null, null, null, null, null, null, null, null, null, null, 0
)
),
new RealtimeTuningConfig(
diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index db5ae44be8f..10779eec8bd 100644
--- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -149,6 +149,7 @@ public class RealtimePlumberSchoolTest
Granularity.HOUR,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
+ null,
0
);
diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java
index 46348b9ff8b..cace5a4fae0 100644
--- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java
+++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java
@@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester()
{
- super(null, null, null, null, null);
+ super(null, null, null, null, null, null);
}
@Override
diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
index 3d1d0cd0bb0..5c2320f9885 100644
--- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
+++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -22,7 +22,6 @@ package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
-import com.metamx.common.Pair;
import com.metamx.http.client.HttpClient;
import io.druid.client.DruidServer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
@@ -30,11 +29,9 @@ import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.query.Druids;
-import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
-import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule;
import junit.framework.Assert;
@@ -85,7 +82,7 @@ public class TieredBrokerHostSelectorTest
}
},
factory,
- Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy())
+ Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1))
);
EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory);
@@ -198,28 +195,50 @@ public class TieredBrokerHostSelectorTest
}
@Test
- public void testPrioritySelect() throws Exception
- {
- String brokerName = (String) brokerSelector.select(
- Druids.newTimeseriesQueryBuilder()
- .dataSource("test")
- .aggregators(Arrays.asList(new CountAggregatorFactory("count")))
- .intervals(
- new MultipleIntervalSegmentSpec(
- Arrays.asList(
- new Interval("2011-08-31/2011-09-01"),
- new Interval("2012-08-31/2012-09-01"),
- new Interval("2013-08-31/2013-09-01")
- )
- )
- )
- .context(ImmutableMap.of("priority", -1))
- .build()
- ).lhs;
+ public void testPrioritySelect() throws Exception
+ {
+ String brokerName = (String) brokerSelector.select(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .aggregators(Arrays.asList(new CountAggregatorFactory("count")))
+ .intervals(
+ new MultipleIntervalSegmentSpec(
+ Arrays.asList(
+ new Interval("2011-08-31/2011-09-01"),
+ new Interval("2012-08-31/2012-09-01"),
+ new Interval("2013-08-31/2013-09-01")
+ )
+ )
+ )
+ .context(ImmutableMap.of("priority", -1))
+ .build()
+ ).lhs;
- Assert.assertEquals("hotBroker", brokerName);
- }
+ Assert.assertEquals("coldBroker", brokerName);
+ }
+ @Test
+ public void testPrioritySelect2() throws Exception
+ {
+ String brokerName = (String) brokerSelector.select(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .aggregators(Arrays.asList(new CountAggregatorFactory("count")))
+ .intervals(
+ new MultipleIntervalSegmentSpec(
+ Arrays.asList(
+ new Interval("2011-08-31/2011-09-01"),
+ new Interval("2012-08-31/2012-09-01"),
+ new Interval("2013-08-31/2013-09-01")
+ )
+ )
+ )
+ .context(ImmutableMap.of("priority", 5))
+ .build()
+ ).lhs;
+
+ Assert.assertEquals("hotBroker", brokerName);
+ }
private static class TestRuleManager extends CoordinatorRuleManager
{
diff --git a/services/pom.xml b/services/pom.xml
index 1d52d3cff6f..f0f3090e198 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -27,7 +27,7 @@
io.druid
druid
- 0.6.139-SNAPSHOT
+ 0.6.140-SNAPSHOT
diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java
index f50eb538ce8..95919bc3de5 100644
--- a/services/src/main/java/io/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/io/druid/cli/CliCoordinator.java
@@ -61,6 +61,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
import java.util.List;
+import java.util.concurrent.Executors;
/**
*/
@@ -128,10 +129,16 @@ public class CliCoordinator extends ServerRunnable
@Provides
@LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(
- CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config
+ CuratorFramework curator,
+ ObjectMapper jsonMapper,
+ ScheduledExecutorFactory factory,
+ DruidCoordinatorConfig config
)
{
- return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
+ return new LoadQueueTaskMaster(
+ curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"),
+ Executors.newSingleThreadExecutor(), config
+ );
}
}
);