diff --git a/README.md b/README.md index e2a7a230285..2a5f1efae96 100644 --- a/README.md +++ b/README.md @@ -17,4 +17,6 @@ We host documentation on our [website](http://druid.io/docs/latest/). If you wan We have a series of tutorials to get started with Druid, starting with this [one](http://druid.io/docs/latest/Tutorial:-A-First-Look-at-Druid.html). ### Support -Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in #druid-dev on irc.freenode.net. +Report any bugs using [GitHub issues](https://github.com/metamx/druid/issues). + +Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in `#druid-dev` on `irc.freenode.net`. diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml index cf28bf6f200..e8ebb570a86 100644 --- a/cassandra-storage/pom.xml +++ b/cassandra-storage/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index ebdb531c35b..0b57dcf484e 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/docs/content/Examples.md b/docs/content/Examples.md index 3e915926b1b..2d0759abc57 100644 --- a/docs/content/Examples.md +++ b/docs/content/Examples.md @@ -19,13 +19,13 @@ Clone Druid and build it: git clone https://github.com/metamx/druid.git druid cd druid git fetch --tags -git checkout druid-0.6.138 +git checkout druid-0.6.139 ./build.sh ``` ### Downloading the DSK (Druid Standalone Kit) -[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz) a stand-alone tarball and run it: +[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it: ``` bash tar -xzf druid-services-0.X.X-bin.tar.gz diff --git a/docs/content/Kafka-Eight.md b/docs/content/Kafka-Eight.md index 129081a12e9..240bf31fefe 100644 --- a/docs/content/Kafka-Eight.md +++ b/docs/content/Kafka-Eight.md @@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need - Update realtime node's configs for Kafka 8 extensions - e.g. - - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.138",...]` + - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]` - becomes - - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.138",...]` + - `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]` - Update realtime task config for changed keys - `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes. diff --git a/docs/content/Production-Cluster-Configuration.md b/docs/content/Production-Cluster-Configuration.md index a79ee275c77..2b2328b2d95 100644 --- a/docs/content/Production-Cluster-Configuration.md +++ b/docs/content/Production-Cluster-Configuration.md @@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/overlord -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod @@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/middlemanager -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod @@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/historical -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod diff --git a/docs/content/Realtime-Config.md b/docs/content/Realtime-Config.md index 91fc4696a88..80d67c3ffad 100644 --- a/docs/content/Realtime-Config.md +++ b/docs/content/Realtime-Config.md @@ -27,7 +27,7 @@ druid.host=localhost druid.service=realtime druid.port=8083 -druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] druid.zk.service.host=localhost @@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/prod/realtime -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] druid.zk.service.host=#{ZK_IPs} druid.zk.paths.base=/druid/prod diff --git a/docs/content/Simple-Cluster-Configuration.md b/docs/content/Simple-Cluster-Configuration.md index 0742bb80689..820fb0e9d6b 100644 --- a/docs/content/Simple-Cluster-Configuration.md +++ b/docs/content/Simple-Cluster-Configuration.md @@ -28,7 +28,7 @@ Configuration: -Ddruid.zk.service.host=localhost --Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"] +-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] -Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid -Ddruid.db.connector.user=druid diff --git a/docs/content/Tutorial:-A-First-Look-at-Druid.md b/docs/content/Tutorial:-A-First-Look-at-Druid.md index 1ef439fc83f..a60942f8544 100644 --- a/docs/content/Tutorial:-A-First-Look-at-Druid.md +++ b/docs/content/Tutorial:-A-First-Look-at-Druid.md @@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu ### Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz). Download this file to a directory of your choosing. +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.138 +cd druid-services-0.6.139 ``` You should see a bunch of files: diff --git a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md index a0e248e04b4..9ef08aedda3 100644 --- a/docs/content/Tutorial:-Loading-Your-Data-Part-1.md +++ b/docs/content/Tutorial:-Loading-Your-Data-Part-1.md @@ -91,7 +91,7 @@ druid.service=overlord druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"] druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid druid.db.connector.user=druid diff --git a/docs/content/Tutorial:-The-Druid-Cluster.md b/docs/content/Tutorial:-The-Druid-Cluster.md index bb8fdf15dd7..095c175a719 100644 --- a/docs/content/Tutorial:-The-Druid-Cluster.md +++ b/docs/content/Tutorial:-The-Druid-Cluster.md @@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. -You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz) +You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) and untar the contents within by issuing: @@ -149,7 +149,7 @@ druid.port=8081 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"] # Dummy read only AWS account (used to download example data) druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b @@ -240,7 +240,7 @@ druid.port=8083 druid.zk.service.host=localhost -druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"] +druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"] # Change this config to db to hand off to the rest of the Druid cluster druid.publish.type=noop diff --git a/docs/content/Tutorial:-Webstream.md b/docs/content/Tutorial:-Webstream.md index 6c8587a0368..2a23fde861f 100644 --- a/docs/content/Tutorial:-Webstream.md +++ b/docs/content/Tutorial:-Webstream.md @@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu h3. Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz) +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) Download this file to a directory of your choosing. You can extract the awesomeness within by issuing: @@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz Not too lost so far right? That's great! If you cd into the directory: ``` -cd druid-services-0.6.138 +cd druid-services-0.6.139 ``` You should see a bunch of files: diff --git a/docs/content/Twitter-Tutorial.md b/docs/content/Twitter-Tutorial.md index d9d8a9961b7..9f8c80ad8ad 100644 --- a/docs/content/Twitter-Tutorial.md +++ b/docs/content/Twitter-Tutorial.md @@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source. # Download a Tarball -We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz). +We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this bad boy to a directory of your choosing. You can extract the awesomeness within by issuing: diff --git a/examples/bin/run_example_server.sh b/examples/bin/run_example_server.sh index 3d63e7cb0c3..461f937beb0 100755 --- a/examples/bin/run_example_server.sh +++ b/examples/bin/run_example_server.sh @@ -58,6 +58,7 @@ DRUID_CP=${EXAMPLE_LOC} DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime #For the kit DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/* +DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime echo "Running command:" diff --git a/examples/config/global.runtime.properties b/examples/config/_global/global.runtime.properties similarity index 100% rename from examples/config/global.runtime.properties rename to examples/config/_global/global.runtime.properties diff --git a/examples/config/broker/runtime.properties b/examples/config/broker/runtime.properties index 31799a7caf9..23d1170343b 100644 --- a/examples/config/broker/runtime.properties +++ b/examples/config/broker/runtime.properties @@ -2,6 +2,6 @@ druid.host=localhost druid.service=broker druid.port=8080 -# Add more threads or larger buffer for faster groupBys +# Bump these up only for faster nested groupBy druid.processing.buffer.sizeBytes=100000000 druid.processing.numThreads=1 diff --git a/examples/config/realtime/runtime.properties b/examples/config/realtime/runtime.properties index a78e76686db..b08d74a19ea 100644 --- a/examples/config/realtime/runtime.properties +++ b/examples/config/realtime/runtime.properties @@ -6,4 +6,7 @@ druid.port=8083 druid.publish.type=noop druid.processing.buffer.sizeBytes=100000000 -druid.processing.numThreads=1 \ No newline at end of file +druid.processing.numThreads=1 + +# Enable Real monitoring +# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"] \ No newline at end of file diff --git a/examples/pom.xml b/examples/pom.xml index c6efb88b0b7..321e4a226e8 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/hdfs-storage/pom.xml b/hdfs-storage/pom.xml index 7a75386914e..1fe657ba77a 100644 --- a/hdfs-storage/pom.xml +++ b/hdfs-storage/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/histogram/pom.xml b/histogram/pom.xml index 0fea0bcd982..24e9937621a 100644 --- a/histogram/pom.xml +++ b/histogram/pom.xml @@ -27,7 +27,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 40b655af3f3..22fb248bf9b 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index be252b1a25e..09368b7c105 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 57b6c0ddaa1..37055fb4af9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -115,7 +115,8 @@ public class RealtimeIndexTask extends AbstractTask @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("maxPendingPersists") int maxPendingPersists, @JsonProperty("segmentGranularity") Granularity segmentGranularity, - @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory + @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy, + @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory ) { super( @@ -142,7 +143,7 @@ public class RealtimeIndexTask extends AbstractTask windowPeriod, null, null, - rejectionPolicyFactory, + rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy, maxPendingPersists, spec.getShardSpec() ), @@ -315,6 +316,7 @@ public class RealtimeIndexTask extends AbstractTask null, null, null, + null, 0 ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 6220f447bde..5280a884a55 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -54,6 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask null, 1, null, + null, null ); this.status = status; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 516ea984301..fc7f13ef3ad 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -207,6 +207,7 @@ public class TaskSerdeTest new Period("PT10M"), 1, Granularity.HOUR, + null, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 7aafad53e23..ec593aeee33 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -49,6 +49,7 @@ public class TaskAnnouncementTest new Period("PT10M"), 1, Granularity.HOUR, + null, null ); final TaskStatus status = TaskStatus.running(task.getId()); diff --git a/kafka-eight/pom.xml b/kafka-eight/pom.xml index 18ceac9ffcf..7c09c218cc7 100644 --- a/kafka-eight/pom.xml +++ b/kafka-eight/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml index c045bbd4a78..cbd3a85f0e0 100644 --- a/kafka-seven/pom.xml +++ b/kafka-seven/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/pom.xml b/pom.xml index 67ab9cd7ee0..41e1e9ee7e6 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ io.druid druid pom - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT druid druid diff --git a/processing/pom.xml b/processing/pom.xml index aab8f6851ed..a92d70564ee 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java b/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java index bf0432ad5c9..efb91a9c3c0 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/OrderByColumnSpec.java @@ -185,7 +185,7 @@ public class OrderByColumnSpec final byte[] dimensionBytes = dimension.getBytes(); final byte[] directionBytes = direction.name().getBytes(); - return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length) + return ByteBuffer.allocate(dimensionBytes.length + directionBytes.length) .put(dimensionBytes) .put(directionBytes) .array(); diff --git a/rabbitmq/pom.xml b/rabbitmq/pom.xml index 942474943c4..fd953df211a 100644 --- a/rabbitmq/pom.xml +++ b/rabbitmq/pom.xml @@ -9,7 +9,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/s3-extensions/pom.xml b/s3-extensions/pom.xml index 38522266519..586529ab49f 100644 --- a/s3-extensions/pom.xml +++ b/s3-extensions/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index 0e0ef373f3e..d752f9a1811 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index 611d16b476f..a90fe44d352 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -65,7 +65,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy, - @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, + @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy, + @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") int maxPendingPersists ) { @@ -81,6 +82,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool basePersistDirectory, segmentGranularity, versioningPolicy, + rejectionPolicy, rejectionPolicyFactory, maxPendingPersists ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index fb28f6c8ae1..bf1b575ff41 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber return; } - File mergedFile = null; try { List indexes = Lists.newArrayList(); for (FireHydrant fireHydrant : sink) { @@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber indexes.add(queryableIndex); } - mergedFile = IndexMerger.mergeQueryableIndex( + final File mergedFile = IndexMerger.mergeQueryableIndex( indexes, schema.getAggregators(), mergedTarget @@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber segmentPublisher.publishSegment(segment); } - catch (IOException e) { + catch (Exception e) { log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) .addData("interval", interval) .emit(); if (shuttingDown) { // We're trying to shut down, and this segment failed to push. Let's just get rid of it. + // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. abandonSegment(truncatedTime, sink); - } - } - - if (mergedFile != null) { - try { - log.info("Deleting Index File[%s]", mergedFile); - FileUtils.deleteDirectory(mergedFile); - } - catch (IOException e) { - log.warn(e, "Error deleting directory[%s]", mergedFile); + } else { + // Delete any possibly-partially-written files, so we can try again on the next push cycle. + removeMergedSegment(sink); } } } @@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber } /** - * Unannounces a given sink and removes all local references to it. + * Unannounces a given sink and removes all local references to it. It is important that this is only called + * from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while + * being created. */ protected void abandonSegment(final long truncatedTime, final Sink sink) { try { segmentAnnouncer.unannounceSegment(sink.getSegment()); - FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); + removeMergedSegment(sink); log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); sinks.remove(truncatedTime); sinkTimeline.remove( @@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber handoffCondition.notifyAll(); } } - catch (IOException e) { + catch (Exception e) { log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) .addData("interval", sink.getInterval()) .emit(); @@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber } ); } + + private void removeMergedSegment(final Sink sink) + { + final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged"); + if (mergedTarget.exists()) { + try { + log.info("Deleting Index File[%s]", mergedTarget); + FileUtils.deleteDirectory(mergedTarget); + } + catch (Exception e) { + log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } + } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 43020d7cc53..b3b602b8212 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -75,7 +75,8 @@ public class RealtimePlumberSchool implements PlumberSchool @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("segmentGranularity") Granularity segmentGranularity, @JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy, - @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, + @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy, + @JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") int maxPendingPersists ) { @@ -90,7 +91,7 @@ public class RealtimePlumberSchool implements PlumberSchool this.basePersistDirectory = basePersistDirectory; this.segmentGranularity = segmentGranularity; this.versioningPolicy = versioningPolicy; - this.rejectionPolicyFactory = rejectionPolicyFactory; + this.rejectionPolicyFactory = (rejectionPolicy == null) ? rejectionPolicyFactory : rejectionPolicy; this.maxPendingPersists = maxPendingPersists; } diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index c7ee45ce492..da0fd0066a3 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -45,6 +45,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -73,6 +74,7 @@ public class LoadQueuePeon private final String basePath; private final ObjectMapper jsonMapper; private final ScheduledExecutorService zkWritingExecutor; + private final ExecutorService callBackExecutor; private final DruidCoordinatorConfig config; private final AtomicLong queuedSize = new AtomicLong(0); @@ -94,12 +96,14 @@ public class LoadQueuePeon String basePath, ObjectMapper jsonMapper, ScheduledExecutorService zkWritingExecutor, + ExecutorService callbackExecutor, DruidCoordinatorConfig config ) { this.curator = curator; this.basePath = basePath; this.jsonMapper = jsonMapper; + this.callBackExecutor = callbackExecutor; this.zkWritingExecutor = zkWritingExecutor; this.config = config; } @@ -333,8 +337,18 @@ public class LoadQueuePeon default: throw new UnsupportedOperationException(); } - currentlyProcessing.executeCallbacks(); - currentlyProcessing = null; + + callBackExecutor.execute( + new Runnable() + { + @Override + public void run() + { + currentlyProcessing.executeCallbacks(); + currentlyProcessing = null; + } + } + ); } } diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java index 74841853589..d6cb2a6ba0d 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueueTaskMaster.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.curator.framework.CuratorFramework; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; /** @@ -33,6 +34,7 @@ public class LoadQueueTaskMaster private final CuratorFramework curator; private final ObjectMapper jsonMapper; private final ScheduledExecutorService peonExec; + private final ExecutorService callbackExec; private final DruidCoordinatorConfig config; @Inject @@ -40,17 +42,19 @@ public class LoadQueueTaskMaster CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorService peonExec, + ExecutorService callbackExec, DruidCoordinatorConfig config ) { this.curator = curator; this.jsonMapper = jsonMapper; this.peonExec = peonExec; + this.callbackExec = callbackExec; this.config = config; } public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config); + return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config); } } diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 4fbcd78a602..d911aef7d35 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -19,6 +19,8 @@ package io.druid.server.router; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.Iterables; import io.druid.query.Query; @@ -27,12 +29,32 @@ import io.druid.query.Query; */ public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy { + private final int minPriority; + private final int maxPriority; + + @JsonCreator + public PriorityTieredBrokerSelectorStrategy( + @JsonProperty("minPriority") Integer minPriority, + @JsonProperty("maxPriority") Integer maxPriority + ) + { + this.minPriority = minPriority == null ? 0 : minPriority; + this.maxPriority = maxPriority == null ? 1 : maxPriority; + } + @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { final int priority = query.getContextPriority(0); - if (priority < 0) { + if (priority < minPriority) { + return Optional.of( + Iterables.getLast( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ) + ); + } else if (priority >= maxPriority) { return Optional.of( Iterables.getFirst( tierConfig.getTierToBrokerMap().values(), diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java index 395dd81e2e6..11e8ac67181 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java @@ -20,12 +20,15 @@ package io.druid.server.router; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.druid.client.DruidServer; import org.joda.time.Period; import javax.validation.constraints.NotNull; +import java.util.Arrays; import java.util.LinkedHashMap; +import java.util.List; /** */ @@ -56,7 +59,10 @@ public class TieredBrokerConfig @JsonProperty @NotNull - private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]"; + private List strategies = Arrays.asList( + new TimeBoundaryTieredBrokerSelectorStrategy(), + new PriorityTieredBrokerSelectorStrategy(0, 1) + ); // tier, public LinkedHashMap getTierToBrokerMap() @@ -93,8 +99,8 @@ public class TieredBrokerConfig return pollPeriod; } - public String getStrategies() + public List getStrategies() { - return strategies; + return ImmutableList.copyOf(strategies); } } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java index 3300261b7d4..268ae31407b 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java @@ -19,10 +19,6 @@ package io.druid.server.router; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.client.util.Lists; -import com.google.common.base.Throwables; import com.google.inject.Inject; import com.google.inject.Provider; @@ -32,23 +28,12 @@ import java.util.List; */ public class TieredBrokerSelectorStrategiesProvider implements Provider> { - private final List strategies = Lists.newArrayList(); + private final List strategies; @Inject - public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config) + public TieredBrokerSelectorStrategiesProvider(TieredBrokerConfig config) { - try { - this.strategies.addAll( - (List) jsonMapper.readValue( - config.getStrategies(), new TypeReference>() - { - } - ) - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } + this.strategies = config.getStrategies(); } @Override diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 19c104a4ff0..0113ee3dbc1 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -73,7 +73,7 @@ public class FireDepartmentTest new RealtimeIOConfig( null, new RealtimePlumberSchool( - null, null, null, null, null, null, null, null, null, null, null, null, 0 + null, null, null, null, null, null, null, null, null, null, null, null, null, 0 ) ), new RealtimeTuningConfig( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index db5ae44be8f..10779eec8bd 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -149,6 +149,7 @@ public class RealtimePlumberSchoolTest Granularity.HOUR, new IntervalStartVersioningPolicy(), new NoopRejectionPolicyFactory(), + null, 0 ); diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java index 46348b9ff8b..cace5a4fae0 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTester.java @@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon public LoadQueuePeonTester() { - super(null, null, null, null, null); + super(null, null, null, null, null, null); } @Override diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 3d1d0cd0bb0..5c2320f9885 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -22,7 +22,6 @@ package io.druid.server.router; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; -import com.metamx.common.Pair; import com.metamx.http.client.HttpClient; import io.druid.client.DruidServer; import io.druid.curator.discovery.ServerDiscoveryFactory; @@ -30,11 +29,9 @@ import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.query.Druids; -import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.Rule; import junit.framework.Assert; @@ -85,7 +82,7 @@ public class TieredBrokerHostSelectorTest } }, factory, - Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy()) + Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1)) ); EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); EasyMock.replay(factory); @@ -198,28 +195,50 @@ public class TieredBrokerHostSelectorTest } @Test - public void testPrioritySelect() throws Exception - { - String brokerName = (String) brokerSelector.select( - Druids.newTimeseriesQueryBuilder() - .dataSource("test") - .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) - .intervals( - new MultipleIntervalSegmentSpec( - Arrays.asList( - new Interval("2011-08-31/2011-09-01"), - new Interval("2012-08-31/2012-09-01"), - new Interval("2013-08-31/2013-09-01") - ) - ) - ) - .context(ImmutableMap.of("priority", -1)) - .build() - ).lhs; + public void testPrioritySelect() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ) + .context(ImmutableMap.of("priority", -1)) + .build() + ).lhs; - Assert.assertEquals("hotBroker", brokerName); - } + Assert.assertEquals("coldBroker", brokerName); + } + @Test + public void testPrioritySelect2() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ) + .context(ImmutableMap.of("priority", 5)) + .build() + ).lhs; + + Assert.assertEquals("hotBroker", brokerName); + } private static class TestRuleManager extends CoordinatorRuleManager { diff --git a/services/pom.xml b/services/pom.xml index 1d52d3cff6f..f0f3090e198 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -27,7 +27,7 @@ io.druid druid - 0.6.139-SNAPSHOT + 0.6.140-SNAPSHOT diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index f50eb538ce8..95919bc3de5 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -61,6 +61,7 @@ import org.apache.curator.framework.CuratorFramework; import org.eclipse.jetty.server.Server; import java.util.List; +import java.util.concurrent.Executors; /** */ @@ -128,10 +129,16 @@ public class CliCoordinator extends ServerRunnable @Provides @LazySingleton public LoadQueueTaskMaster getLoadQueueTaskMaster( - CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config + CuratorFramework curator, + ObjectMapper jsonMapper, + ScheduledExecutorFactory factory, + DruidCoordinatorConfig config ) { - return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config); + return new LoadQueueTaskMaster( + curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), + Executors.newSingleThreadExecutor(), config + ); } } );