Merge branch 'master' into druid-0.7.x

This commit is contained in:
fjy 2014-08-08 12:41:49 -07:00
commit 5c3d8b66de
47 changed files with 192 additions and 113 deletions

View File

@ -17,4 +17,6 @@ We host documentation on our [website](http://druid.io/docs/latest/). If you wan
We have a series of tutorials to get started with Druid, starting with this [one](http://druid.io/docs/latest/Tutorial:-A-First-Look-at-Druid.html).
### Support
Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in #druid-dev on irc.freenode.net.
Report any bugs using [GitHub issues](https://github.com/metamx/druid/issues).
Contact us through our [forum](https://groups.google.com/forum/#!forum/druid-development) or on IRC in `#druid-dev` on `irc.freenode.net`.

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.138
git checkout druid-0.6.139
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.138",...]`
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.139",...]`
- becomes
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.138",...]`
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.139",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.

View File

@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.138
cd druid-services-0.6.139
```
You should see a bunch of files:

View File

@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.138
cd druid-services-0.6.139
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.138-bin.tar.gz).
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.139-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.139"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.139"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.138","io.druid.extensions:druid-kafka-seven:0.6.138","io.druid.extensions:druid-rabbitmq:0.6.138"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.139","io.druid.extensions:druid-kafka-seven:0.6.139","io.druid.extensions:druid-rabbitmq:0.6.139"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -115,7 +115,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
@ -142,7 +143,7 @@ public class RealtimeIndexTask extends AbstractTask
windowPeriod,
null,
null,
rejectionPolicyFactory,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
spec.getShardSpec()
),
@ -315,6 +316,7 @@ public class RealtimeIndexTask extends AbstractTask
null,
null,
null,
null,
0
);

View File

@ -54,6 +54,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
1,
null,
null,
null
);
this.status = status;

View File

@ -207,6 +207,7 @@ public class TaskSerdeTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);

View File

@ -49,6 +49,7 @@ public class TaskAnnouncementTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);
final TaskStatus status = TaskStatus.running(task.getId());

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -185,7 +185,7 @@ public class OrderByColumnSpec
final byte[] dimensionBytes = dimension.getBytes();
final byte[] directionBytes = direction.name().getBytes();
return ByteBuffer.allocate(dimensionBytes.length + dimensionBytes.length)
return ByteBuffer.allocate(dimensionBytes.length + directionBytes.length)
.put(dimensionBytes)
.put(directionBytes)
.array();

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -65,7 +65,8 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
@ -81,6 +82,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
basePersistDirectory,
segmentGranularity,
versioningPolicy,
rejectionPolicy,
rejectionPolicyFactory,
maxPendingPersists
);

View File

@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber
return;
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
final File mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
catch (Exception e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
if (shuttingDown) {
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
abandonSegment(truncatedTime, sink);
}
}
if (mergedFile != null) {
try {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
} else {
// Delete any possibly-partially-written files, so we can try again on the next push cycle.
removeMergedSegment(sink);
}
}
}
@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber
}
/**
* Unannounces a given sink and removes all local references to it.
* Unannounces a given sink and removes all local references to it. It is important that this is only called
* from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while
* being created.
*/
protected void abandonSegment(final long truncatedTime, final Sink sink)
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
removeMergedSegment(sink);
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber
handoffCondition.notifyAll();
}
}
catch (IOException e) {
catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber
}
);
}
private void removeMergedSegment(final Sink sink)
{
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
if (mergedTarget.exists()) {
try {
log.info("Deleting Index File[%s]", mergedTarget);
FileUtils.deleteDirectory(mergedTarget);
}
catch (Exception e) {
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();
}
}
}
}

View File

@ -75,7 +75,8 @@ public class RealtimePlumberSchool implements PlumberSchool
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") int maxPendingPersists
)
{
@ -90,7 +91,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = versioningPolicy;
this.rejectionPolicyFactory = rejectionPolicyFactory;
this.rejectionPolicyFactory = (rejectionPolicy == null) ? rejectionPolicyFactory : rejectionPolicy;
this.maxPendingPersists = maxPendingPersists;
}

View File

@ -45,6 +45,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -73,6 +74,7 @@ public class LoadQueuePeon
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor;
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
@ -94,12 +96,14 @@ public class LoadQueuePeon
String basePath,
ObjectMapper jsonMapper,
ScheduledExecutorService zkWritingExecutor,
ExecutorService callbackExecutor,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.callBackExecutor = callbackExecutor;
this.zkWritingExecutor = zkWritingExecutor;
this.config = config;
}
@ -333,10 +337,20 @@ public class LoadQueuePeon
default:
throw new UnsupportedOperationException();
}
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
currentlyProcessing.executeCallbacks();
currentlyProcessing = null;
}
}
);
}
}
public void stop()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -33,6 +34,7 @@ public class LoadQueueTaskMaster
private final CuratorFramework curator;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec;
private final ExecutorService callbackExec;
private final DruidCoordinatorConfig config;
@Inject
@ -40,17 +42,19 @@ public class LoadQueueTaskMaster
CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorService peonExec,
ExecutorService callbackExec,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.jsonMapper = jsonMapper;
this.peonExec = peonExec;
this.callbackExec = callbackExec;
this.config = config;
}
public LoadQueuePeon giveMePeon(String basePath)
{
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config);
}
}

View File

@ -19,6 +19,8 @@
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
@ -27,12 +29,32 @@ import io.druid.query.Query;
*/
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
private final int minPriority;
private final int maxPriority;
@JsonCreator
public PriorityTieredBrokerSelectorStrategy(
@JsonProperty("minPriority") Integer minPriority,
@JsonProperty("maxPriority") Integer maxPriority
)
{
this.minPriority = minPriority == null ? 0 : minPriority;
this.maxPriority = maxPriority == null ? 1 : maxPriority;
}
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = query.getContextPriority(0);
if (priority < 0) {
if (priority < minPriority) {
return Optional.of(
Iterables.getLast(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
} else if (priority >= maxPriority) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),

View File

@ -20,12 +20,15 @@
package io.druid.server.router;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import org.joda.time.Period;
import javax.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
/**
*/
@ -56,7 +59,10 @@ public class TieredBrokerConfig
@JsonProperty
@NotNull
private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]";
private List<TieredBrokerSelectorStrategy> strategies = Arrays.asList(
new TimeBoundaryTieredBrokerSelectorStrategy(),
new PriorityTieredBrokerSelectorStrategy(0, 1)
);
// tier, <bard, numThreads>
public LinkedHashMap<String, String> getTierToBrokerMap()
@ -93,8 +99,8 @@ public class TieredBrokerConfig
return pollPeriod;
}
public String getStrategies()
public List<TieredBrokerSelectorStrategy> getStrategies()
{
return strategies;
return ImmutableList.copyOf(strategies);
}
}

View File

@ -19,10 +19,6 @@
package io.druid.server.router;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Provider;
@ -32,23 +28,12 @@ import java.util.List;
*/
public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>>
{
private final List<TieredBrokerSelectorStrategy> strategies = Lists.newArrayList();
private final List<TieredBrokerSelectorStrategy> strategies;
@Inject
public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config)
public TieredBrokerSelectorStrategiesProvider(TieredBrokerConfig config)
{
try {
this.strategies.addAll(
(List<TieredBrokerSelectorStrategy>) jsonMapper.readValue(
config.getStrategies(), new TypeReference<List<TieredBrokerSelectorStrategy>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
this.strategies = config.getStrategies();
}
@Override

View File

@ -73,7 +73,7 @@ public class FireDepartmentTest
new RealtimeIOConfig(
null,
new RealtimePlumberSchool(
null, null, null, null, null, null, null, null, null, null, null, null, 0
null, null, null, null, null, null, null, null, null, null, null, null, null, 0
)
),
new RealtimeTuningConfig(

View File

@ -149,6 +149,7 @@ public class RealtimePlumberSchoolTest
Granularity.HOUR,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
null,
0
);

View File

@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester()
{
super(null, null, null, null, null);
super(null, null, null, null, null, null);
}
@Override

View File

@ -22,7 +22,6 @@ package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Pair;
import com.metamx.http.client.HttpClient;
import io.druid.client.DruidServer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
@ -30,11 +29,9 @@ import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.query.Druids;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule;
import junit.framework.Assert;
@ -85,7 +82,7 @@ public class TieredBrokerHostSelectorTest
}
},
factory,
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy())
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1))
);
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory);
@ -217,9 +214,31 @@ public class TieredBrokerHostSelectorTest
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
Assert.assertEquals("coldBroker", brokerName);
}
@Test
public void testPrioritySelect2() throws Exception
{
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
.intervals(
new MultipleIntervalSegmentSpec(
Arrays.<Interval>asList(
new Interval("2011-08-31/2011-09-01"),
new Interval("2012-08-31/2012-09-01"),
new Interval("2013-08-31/2013-09-01")
)
)
)
.context(ImmutableMap.<String, Object>of("priority", 5))
.build()
).lhs;
Assert.assertEquals("hotBroker", brokerName);
}
private static class TestRuleManager extends CoordinatorRuleManager
{

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.139-SNAPSHOT</version>
<version>0.6.140-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -61,6 +61,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
import java.util.List;
import java.util.concurrent.Executors;
/**
*/
@ -128,10 +129,16 @@ public class CliCoordinator extends ServerRunnable
@Provides
@LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config
CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorFactory factory,
DruidCoordinatorConfig config
)
{
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
return new LoadQueueTaskMaster(
curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"),
Executors.newSingleThreadExecutor(), config
);
}
}
);