mirror of https://github.com/apache/druid.git
Merge branch 'master' into new-init
Conflicts: examples/config/historical/runtime.properties examples/config/overlord/runtime.properties examples/config/realtime/runtime.properties
This commit is contained in:
commit
f3970bb1d8
|
@ -17,4 +17,6 @@ We host documentation on our [website](http://druid.io/docs/latest/). If you wan
|
|||
We have a series of tutorials to get started with Druid, starting with this [one](http://druid.io/docs/latest/Tutorial:-A-First-Look-at-Druid.html).
|
||||
|
||||
### Support
|
||||
Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in #druid-dev on irc.freenode.net.
|
||||
Report any bugs using [GitHub issues](https://github.com/metamx/druid/issues).
|
||||
|
||||
Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in `#druid-dev` on `irc.freenode.net`.
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,13 +19,13 @@ Clone Druid and build it:
|
|||
git clone https://github.com/metamx/druid.git druid
|
||||
cd druid
|
||||
git fetch --tags
|
||||
git checkout druid-0.6.138
|
||||
git checkout druid-0.6.139
|
||||
./build.sh
|
||||
```
|
||||
|
||||
### Downloading the DSK (Druid Standalone Kit)
|
||||
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz) a stand-alone tarball and run it:
|
||||
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it:
|
||||
|
||||
``` bash
|
||||
tar -xzf druid-services-0.X.X-bin.tar.gz
|
||||
|
|
|
@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
|
|||
|
||||
- Update realtime node's configs for Kafka 8 extensions
|
||||
- e.g.
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.138",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]`
|
||||
- becomes
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.138",...]`
|
||||
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]`
|
||||
- Update realtime task config for changed keys
|
||||
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/overlord
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/middlemanager
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/historical
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -27,7 +27,7 @@ druid.host=localhost
|
|||
druid.service=realtime
|
||||
druid.port=8083
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
|
||||
|
||||
druid.zk.service.host=localhost
|
||||
|
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
|
|||
druid.port=8080
|
||||
druid.service=druid/prod/realtime
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
|
||||
druid.zk.service.host=#{ZK_IPs}
|
||||
druid.zk.paths.base=/druid/prod
|
||||
|
|
|
@ -28,7 +28,7 @@ Configuration:
|
|||
|
||||
-Ddruid.zk.service.host=localhost
|
||||
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
|
||||
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
|
||||
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
-Ddruid.db.connector.user=druid
|
||||
|
|
|
@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
### Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.138
|
||||
cd druid-services-0.6.139
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -91,7 +91,7 @@ druid.service=overlord
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
|
||||
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
|
||||
druid.db.connector.user=druid
|
||||
|
|
|
@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
|
|||
|
||||
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
|
||||
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz)
|
||||
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
|
||||
|
||||
and untar the contents within by issuing:
|
||||
|
||||
|
@ -149,7 +149,7 @@ druid.port=8081
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
|
||||
|
||||
# Dummy read only AWS account (used to download example data)
|
||||
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
|
||||
|
@ -240,7 +240,7 @@ druid.port=8083
|
|||
|
||||
druid.zk.service.host=localhost
|
||||
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
|
||||
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
|
||||
|
||||
# Change this config to db to hand off to the rest of the Druid cluster
|
||||
druid.publish.type=noop
|
||||
|
|
|
@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
|
|||
|
||||
h3. Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz)
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
|
||||
Download this file to a directory of your choosing.
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
||||
|
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
|
|||
Not too lost so far right? That's great! If you cd into the directory:
|
||||
|
||||
```
|
||||
cd druid-services-0.6.138
|
||||
cd druid-services-0.6.139
|
||||
```
|
||||
|
||||
You should see a bunch of files:
|
||||
|
|
|
@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
|
|||
|
||||
# Download a Tarball
|
||||
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz).
|
||||
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz).
|
||||
Download this bad boy to a directory of your choosing.
|
||||
|
||||
You can extract the awesomeness within by issuing:
|
||||
|
|
|
@ -58,6 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
|
|||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
|
||||
#For the kit
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
|
||||
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
|
||||
|
||||
echo "Running command:"
|
||||
|
|
|
@ -2,6 +2,6 @@ druid.host=localhost
|
|||
druid.service=broker
|
||||
druid.port=8080
|
||||
|
||||
# Add more threads or larger buffer for faster groupBys
|
||||
# Bump these up only for faster nested groupBy
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
|
|
|
@ -6,4 +6,7 @@ druid.port=8083
|
|||
druid.publish.type=noop
|
||||
|
||||
druid.processing.buffer.sizeBytes=100000000
|
||||
druid.processing.numThreads=1
|
||||
druid.processing.numThreads=1
|
||||
|
||||
# Enable Real monitoring
|
||||
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"]
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -115,7 +115,8 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("maxPendingPersists") int maxPendingPersists,
|
||||
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
|
||||
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -142,7 +143,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
windowPeriod,
|
||||
null,
|
||||
null,
|
||||
rejectionPolicyFactory,
|
||||
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
|
||||
maxPendingPersists,
|
||||
spec.getShardSpec()
|
||||
),
|
||||
|
@ -315,6 +316,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
0
|
||||
);
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
|
|||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
this.status = status;
|
||||
|
|
|
@ -207,6 +207,7 @@ public class TaskSerdeTest
|
|||
new Period("PT10M"),
|
||||
1,
|
||||
Granularity.HOUR,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ public class TaskAnnouncementTest
|
|||
new Period("PT10M"),
|
||||
1,
|
||||
Granularity.HOUR,
|
||||
null,
|
||||
null
|
||||
);
|
||||
final TaskStatus status = TaskStatus.running(task.getId());
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -185,7 +185,7 @@ public class OrderByColumnSpec
|
|||
final byte[] dimensionBytes = dimension.getBytes();
|
||||
final byte[] directionBytes = direction.name().getBytes();
|
||||
|
||||
return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length)
|
||||
return ByteBuffer.allocate(dimensionBytes.length + directionBytes.length)
|
||||
.put(dimensionBytes)
|
||||
.put(directionBytes)
|
||||
.array();
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -65,7 +65,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
|
|||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
|
||||
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("maxPendingPersists") int maxPendingPersists
|
||||
)
|
||||
{
|
||||
|
@ -81,6 +82,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
|
|||
basePersistDirectory,
|
||||
segmentGranularity,
|
||||
versioningPolicy,
|
||||
rejectionPolicy,
|
||||
rejectionPolicyFactory,
|
||||
maxPendingPersists
|
||||
);
|
||||
|
|
|
@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber
|
|||
return;
|
||||
}
|
||||
|
||||
File mergedFile = null;
|
||||
try {
|
||||
List<QueryableIndex> indexes = Lists.newArrayList();
|
||||
for (FireHydrant fireHydrant : sink) {
|
||||
|
@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber
|
|||
indexes.add(queryableIndex);
|
||||
}
|
||||
|
||||
mergedFile = IndexMerger.mergeQueryableIndex(
|
||||
final File mergedFile = IndexMerger.mergeQueryableIndex(
|
||||
indexes,
|
||||
schema.getAggregators(),
|
||||
mergedTarget
|
||||
|
@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber
|
|||
|
||||
segmentPublisher.publishSegment(segment);
|
||||
}
|
||||
catch (IOException e) {
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
|
||||
.addData("interval", interval)
|
||||
.emit();
|
||||
if (shuttingDown) {
|
||||
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
|
||||
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
|
||||
abandonSegment(truncatedTime, sink);
|
||||
}
|
||||
}
|
||||
|
||||
if (mergedFile != null) {
|
||||
try {
|
||||
log.info("Deleting Index File[%s]", mergedFile);
|
||||
FileUtils.deleteDirectory(mergedFile);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Error deleting directory[%s]", mergedFile);
|
||||
} else {
|
||||
// Delete any possibly-partially-written files, so we can try again on the next push cycle.
|
||||
removeMergedSegment(sink);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber
|
|||
}
|
||||
|
||||
/**
|
||||
* Unannounces a given sink and removes all local references to it.
|
||||
* Unannounces a given sink and removes all local references to it. It is important that this is only called
|
||||
* from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while
|
||||
* being created.
|
||||
*/
|
||||
protected void abandonSegment(final long truncatedTime, final Sink sink)
|
||||
{
|
||||
try {
|
||||
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
||||
removeMergedSegment(sink);
|
||||
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
||||
sinks.remove(truncatedTime);
|
||||
sinkTimeline.remove(
|
||||
|
@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber
|
|||
handoffCondition.notifyAll();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
|
||||
.addData("interval", sink.getInterval())
|
||||
.emit();
|
||||
|
@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void removeMergedSegment(final Sink sink)
|
||||
{
|
||||
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
|
||||
if (mergedTarget.exists()) {
|
||||
try {
|
||||
log.info("Deleting Index File[%s]", mergedTarget);
|
||||
FileUtils.deleteDirectory(mergedTarget);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
|
||||
.addData("interval", sink.getInterval())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,7 +75,8 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
|
||||
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
|
||||
@JsonProperty("maxPendingPersists") int maxPendingPersists
|
||||
)
|
||||
{
|
||||
|
@ -90,7 +91,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
this.basePersistDirectory = basePersistDirectory;
|
||||
this.segmentGranularity = segmentGranularity;
|
||||
this.versioningPolicy = versioningPolicy;
|
||||
this.rejectionPolicyFactory = rejectionPolicyFactory;
|
||||
this.rejectionPolicyFactory = (rejectionPolicy == null) ? rejectionPolicyFactory : rejectionPolicy;
|
||||
this.maxPendingPersists = maxPendingPersists;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -73,6 +74,7 @@ public class LoadQueuePeon
|
|||
private final String basePath;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ScheduledExecutorService zkWritingExecutor;
|
||||
private final ExecutorService callBackExecutor;
|
||||
private final DruidCoordinatorConfig config;
|
||||
|
||||
private final AtomicLong queuedSize = new AtomicLong(0);
|
||||
|
@ -94,12 +96,14 @@ public class LoadQueuePeon
|
|||
String basePath,
|
||||
ObjectMapper jsonMapper,
|
||||
ScheduledExecutorService zkWritingExecutor,
|
||||
ExecutorService callbackExecutor,
|
||||
DruidCoordinatorConfig config
|
||||
)
|
||||
{
|
||||
this.curator = curator;
|
||||
this.basePath = basePath;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.callBackExecutor = callbackExecutor;
|
||||
this.zkWritingExecutor = zkWritingExecutor;
|
||||
this.config = config;
|
||||
}
|
||||
|
@ -333,8 +337,18 @@ public class LoadQueuePeon
|
|||
default:
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
currentlyProcessing.executeCallbacks();
|
||||
currentlyProcessing = null;
|
||||
|
||||
callBackExecutor.execute(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
currentlyProcessing.executeCallbacks();
|
||||
currentlyProcessing = null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.inject.Inject;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -33,6 +34,7 @@ public class LoadQueueTaskMaster
|
|||
private final CuratorFramework curator;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ScheduledExecutorService peonExec;
|
||||
private final ExecutorService callbackExec;
|
||||
private final DruidCoordinatorConfig config;
|
||||
|
||||
@Inject
|
||||
|
@ -40,17 +42,19 @@ public class LoadQueueTaskMaster
|
|||
CuratorFramework curator,
|
||||
ObjectMapper jsonMapper,
|
||||
ScheduledExecutorService peonExec,
|
||||
ExecutorService callbackExec,
|
||||
DruidCoordinatorConfig config
|
||||
)
|
||||
{
|
||||
this.curator = curator;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.peonExec = peonExec;
|
||||
this.callbackExec = callbackExec;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public LoadQueuePeon giveMePeon(String basePath)
|
||||
{
|
||||
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
|
||||
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.druid.query.Query;
|
||||
|
@ -27,12 +29,32 @@ import io.druid.query.Query;
|
|||
*/
|
||||
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
|
||||
{
|
||||
private final int minPriority;
|
||||
private final int maxPriority;
|
||||
|
||||
@JsonCreator
|
||||
public PriorityTieredBrokerSelectorStrategy(
|
||||
@JsonProperty("minPriority") Integer minPriority,
|
||||
@JsonProperty("maxPriority") Integer maxPriority
|
||||
)
|
||||
{
|
||||
this.minPriority = minPriority == null ? 0 : minPriority;
|
||||
this.maxPriority = maxPriority == null ? 1 : maxPriority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
|
||||
{
|
||||
final int priority = query.getContextPriority(0);
|
||||
|
||||
if (priority < 0) {
|
||||
if (priority < minPriority) {
|
||||
return Optional.of(
|
||||
Iterables.getLast(
|
||||
tierConfig.getTierToBrokerMap().values(),
|
||||
tierConfig.getDefaultBrokerServiceName()
|
||||
)
|
||||
);
|
||||
} else if (priority >= maxPriority) {
|
||||
return Optional.of(
|
||||
Iterables.getFirst(
|
||||
tierConfig.getTierToBrokerMap().values(),
|
||||
|
|
|
@ -20,12 +20,15 @@
|
|||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.client.DruidServer;
|
||||
import org.joda.time.Period;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -56,7 +59,10 @@ public class TieredBrokerConfig
|
|||
|
||||
@JsonProperty
|
||||
@NotNull
|
||||
private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]";
|
||||
private List<TieredBrokerSelectorStrategy> strategies = Arrays.asList(
|
||||
new TimeBoundaryTieredBrokerSelectorStrategy(),
|
||||
new PriorityTieredBrokerSelectorStrategy(0, 1)
|
||||
);
|
||||
|
||||
// tier, <bard, numThreads>
|
||||
public LinkedHashMap<String, String> getTierToBrokerMap()
|
||||
|
@ -93,8 +99,8 @@ public class TieredBrokerConfig
|
|||
return pollPeriod;
|
||||
}
|
||||
|
||||
public String getStrategies()
|
||||
public List<TieredBrokerSelectorStrategy> getStrategies()
|
||||
{
|
||||
return strategies;
|
||||
return ImmutableList.copyOf(strategies);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,10 +19,6 @@
|
|||
|
||||
package io.druid.server.router;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
|
||||
|
@ -32,23 +28,12 @@ import java.util.List;
|
|||
*/
|
||||
public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>>
|
||||
{
|
||||
private final List<TieredBrokerSelectorStrategy> strategies = Lists.newArrayList();
|
||||
private final List<TieredBrokerSelectorStrategy> strategies;
|
||||
|
||||
@Inject
|
||||
public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config)
|
||||
public TieredBrokerSelectorStrategiesProvider(TieredBrokerConfig config)
|
||||
{
|
||||
try {
|
||||
this.strategies.addAll(
|
||||
(List<TieredBrokerSelectorStrategy>) jsonMapper.readValue(
|
||||
config.getStrategies(), new TypeReference<List<TieredBrokerSelectorStrategy>>()
|
||||
{
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
this.strategies = config.getStrategies();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -73,7 +73,7 @@ public class FireDepartmentTest
|
|||
new RealtimeIOConfig(
|
||||
null,
|
||||
new RealtimePlumberSchool(
|
||||
null, null, null, null, null, null, null, null, null, null, null, null, 0
|
||||
null, null, null, null, null, null, null, null, null, null, null, null, null, 0
|
||||
)
|
||||
),
|
||||
new RealtimeTuningConfig(
|
||||
|
|
|
@ -149,6 +149,7 @@ public class RealtimePlumberSchoolTest
|
|||
Granularity.HOUR,
|
||||
new IntervalStartVersioningPolicy(),
|
||||
new NoopRejectionPolicyFactory(),
|
||||
null,
|
||||
0
|
||||
);
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
|
|||
|
||||
public LoadQueuePeonTester()
|
||||
{
|
||||
super(null, null, null, null, null);
|
||||
super(null, null, null, null, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.server.router;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
|
@ -30,11 +29,9 @@ import io.druid.curator.discovery.ServerDiscoverySelector;
|
|||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.TableDataSource;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
||||
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||
import io.druid.server.coordinator.rules.IntervalLoadRule;
|
||||
import io.druid.server.coordinator.rules.Rule;
|
||||
import junit.framework.Assert;
|
||||
|
@ -85,7 +82,7 @@ public class TieredBrokerHostSelectorTest
|
|||
}
|
||||
},
|
||||
factory,
|
||||
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy())
|
||||
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1))
|
||||
);
|
||||
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
|
||||
EasyMock.replay(factory);
|
||||
|
@ -198,28 +195,50 @@ public class TieredBrokerHostSelectorTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritySelect() throws Exception
|
||||
{
|
||||
String brokerName = (String) brokerSelector.select(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("test")
|
||||
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||
.intervals(
|
||||
new MultipleIntervalSegmentSpec(
|
||||
Arrays.<Interval>asList(
|
||||
new Interval("2011-08-31/2011-09-01"),
|
||||
new Interval("2012-08-31/2012-09-01"),
|
||||
new Interval("2013-08-31/2013-09-01")
|
||||
)
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.<String, Object>of("priority", -1))
|
||||
.build()
|
||||
).lhs;
|
||||
public void testPrioritySelect() throws Exception
|
||||
{
|
||||
String brokerName = (String) brokerSelector.select(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("test")
|
||||
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||
.intervals(
|
||||
new MultipleIntervalSegmentSpec(
|
||||
Arrays.<Interval>asList(
|
||||
new Interval("2011-08-31/2011-09-01"),
|
||||
new Interval("2012-08-31/2012-09-01"),
|
||||
new Interval("2013-08-31/2013-09-01")
|
||||
)
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.<String, Object>of("priority", -1))
|
||||
.build()
|
||||
).lhs;
|
||||
|
||||
Assert.assertEquals("hotBroker", brokerName);
|
||||
}
|
||||
Assert.assertEquals("coldBroker", brokerName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritySelect2() throws Exception
|
||||
{
|
||||
String brokerName = (String) brokerSelector.select(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("test")
|
||||
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||
.intervals(
|
||||
new MultipleIntervalSegmentSpec(
|
||||
Arrays.<Interval>asList(
|
||||
new Interval("2011-08-31/2011-09-01"),
|
||||
new Interval("2012-08-31/2012-09-01"),
|
||||
new Interval("2013-08-31/2013-09-01")
|
||||
)
|
||||
)
|
||||
)
|
||||
.context(ImmutableMap.<String, Object>of("priority", 5))
|
||||
.build()
|
||||
).lhs;
|
||||
|
||||
Assert.assertEquals("hotBroker", brokerName);
|
||||
}
|
||||
|
||||
private static class TestRuleManager extends CoordinatorRuleManager
|
||||
{
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.6.139-SNAPSHOT</version>
|
||||
<version>0.6.140-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.curator.framework.CuratorFramework;
|
|||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -128,10 +129,16 @@ public class CliCoordinator extends ServerRunnable
|
|||
@Provides
|
||||
@LazySingleton
|
||||
public LoadQueueTaskMaster getLoadQueueTaskMaster(
|
||||
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config
|
||||
CuratorFramework curator,
|
||||
ObjectMapper jsonMapper,
|
||||
ScheduledExecutorFactory factory,
|
||||
DruidCoordinatorConfig config
|
||||
)
|
||||
{
|
||||
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
|
||||
return new LoadQueueTaskMaster(
|
||||
curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"),
|
||||
Executors.newSingleThreadExecutor(), config
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue