Merge branch 'master' into igalDruid

This commit is contained in:
Igal Levy 2013-12-20 13:45:16 -08:00
commit 5efca6eea8
67 changed files with 841 additions and 349 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.40"
echo "See also http://druid.io/docs/0.6.46"

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -3,7 +3,7 @@ layout: doc_page
---
# Booting a Single Node Cluster #
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.40-bin.tar.gz).
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.46-bin.tar.gz).
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.40
git checkout druid-0.6.46
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.40"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.zk.service.host=localhost

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.40
cd druid-services-0.6.46
```
You should see a bunch of files:

View File

@ -45,7 +45,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
<a id="set-up-kafka"></a>
#### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.40/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.46/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -238,7 +238,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.40
cd druid-services-0.6.46
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz.
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz.
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40","io.druid.extensions:druid-rabbitmq:0.6.40"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46","io.druid.extensions:druid-rabbitmq:0.6.46"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -62,7 +62,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
try {
inputRow = parser.parse(value.toString());
}
catch (IllegalArgumentException e) {
catch (Exception e) {
if (config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -20,10 +20,15 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
@ -37,10 +42,14 @@ import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
@ -167,7 +176,7 @@ public class TaskToolbox
return objectMapper;
}
public Map<DataSegment, File> getSegments(List<DataSegment> segments)
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
@ -178,6 +187,25 @@ public class TaskToolbox
return retVal;
}
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
}
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
}
}
public File getTaskWorkDir()
{
return taskWorkDir;

View File

@ -19,6 +19,7 @@
package io.druid.indexing.common.actions;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskStorage;
@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
final RetType ret = taskAction.perform(task, toolbox);
if (taskAction.isAudited()) {
// Add audit log
try {
storage.addAuditLog(task, taskAction);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
.addData("actionClass", taskAction.getClass().getName())
.addData("actionClass", actionClass)
.emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}
return ret;
return taskAction.perform(task, toolbox);
}
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);

View File

@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
@ -42,10 +41,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
Task task, TaskActionToolbox toolbox
) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
// Emit metrics

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
// Emit metrics

View File

@ -19,9 +19,11 @@
package io.druid.indexing.common.actions;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
@ -65,6 +67,38 @@ public class TaskActionToolbox
return emitter;
}
public boolean segmentsAreFromSamePartitionSet(
final Set<DataSegment> segments
)
{
// Verify that these segments are all in the same partition set
Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
final DataSegment firstSegment = segments.iterator().next();
for (final DataSegment segment : segments) {
if (!segment.getDataSource().equals(firstSegment.getDataSource())
|| !segment.getInterval().equals(firstSegment.getInterval())
|| !segment.getVersion().equals(firstSegment.getVersion())) {
return false;
}
}
return true;
}
public void verifyTaskLocksAndSinglePartitionSettitude(
final Task task,
final Set<DataSegment> segments,
final boolean allowOlderVersions
)
{
if (!taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
if (!segmentsAreFromSamePartitionSet(segments)) {
throw new ISE("Segments are not in the same partition set: %s", segments);
}
}
public boolean taskLockCoversSegments(
final Task task,
final Set<DataSegment> segments,

View File

@ -21,6 +21,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.Interval;
@ -58,7 +59,8 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
)
{
super(id, groupId, taskResource, dataSource);
this.interval = interval;
this.interval = Preconditions.checkNotNull(interval, "interval");
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
}
@Override

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -103,7 +104,7 @@ public class DeleteTask extends AbstractFixedIntervalTask
segment.getVersion()
);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}

View File

@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidIndexerConfig;
@ -47,12 +51,15 @@ import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class HadoopIndexTask extends AbstractFixedIntervalTask
{
@ -180,14 +187,10 @@ public class HadoopIndexTask extends AbstractFixedIntervalTask
if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, new TypeReference<List<DataSegment>>()
{
}
segments,
new TypeReference<List<DataSegment>>() {}
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId());
} else {
return TaskStatus.failure(getId());

View File

@ -156,7 +156,7 @@ public class IndexTask extends AbstractFixedIntervalTask
segments.add(segment);
}
}
toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments));
toolbox.pushSegments(segments);
return TaskStatus.success(getId());
}

View File

@ -97,11 +97,9 @@ public class KillTask extends AbstractFixedIntervalTask
// Kill segments
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
}
// Remove metadata for these segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId());
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -142,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
);
// download segments to merge
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
final Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
@ -165,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}

View File

@ -98,18 +98,12 @@ public class MoveTask extends AbstractFixedIntervalTask
log.info("OK to move segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> movedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec));
final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(movedSegments)
));
return TaskStatus.success(getId());
}

View File

@ -35,9 +35,9 @@ import org.joda.time.DateTime;
public class NoopTask extends AbstractTask
{
private static final Logger log = new Logger(NoopTask.class);
private static int defaultRunTime = 2500;
private static int defaultIsReadyTime = 0;
private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
private static final int defaultRunTime = 2500;
private static final int defaultIsReadyTime = 0;
private static final IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
enum IsReadyResult
{
@ -139,4 +139,9 @@ public class NoopTask extends AbstractTask
log.info("Woke up!");
return TaskStatus.success(getId());
}
public static NoopTask create()
{
return new NoopTask(null, 0, 0, null, null);
}
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables;
import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.EmittingLogger;
@ -35,9 +34,7 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockReleaseAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
@ -212,7 +209,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void announceSegment(final DataSegment segment) throws IOException
{
// NOTE: Side effect: Calling announceSegment causes a lock to be acquired
// Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getSegmentAnnouncer().announceSegment(segment);
}
@ -231,6 +228,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
// Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
}
@ -263,7 +261,7 @@ public class RealtimeIndexTask extends AbstractTask
public String getVersion(final Interval interval)
{
try {
// NOTE: Side effect: Calling getVersion causes a lock to be acquired
// Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval));
@ -418,7 +416,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void publishSegment(DataSegment segment) throws IOException
{
taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
taskToolbox.pushSegments(ImmutableList.of(segment));
}
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
@ -248,7 +249,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
}
}
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Arrays.asList(segment));
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");

View File

@ -489,7 +489,18 @@ public class DbTaskStorage implements TaskStorage
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
retMap.put((Long) row.get("id"), jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class));
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
}

View File

@ -42,6 +42,7 @@ import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
@ -169,39 +170,39 @@ public class IndexerDBCoordinator
private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException
{
try {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
if (!exists.isEmpty()) {
if (segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
return false;
}
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
// Try/catch to work around races due to SELECT -> INSERT. Avoid ON DUPLICATE KEY since it's not portable.
try {
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
throw e;
}
}
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
@ -211,6 +212,20 @@ public class IndexerDBCoordinator
return true;
}
private boolean segmentExists(final Handle handle, final DataSegment segment) {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
return !exists.isEmpty();
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -251,26 +252,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override
public Collection<ZkWorker> getWorkers()
{
return zkWorkers.values();
return ImmutableList.copyOf(zkWorkers.values());
}
@Override
public Collection<RemoteTaskRunnerWorkItem> getRunningTasks()
{
return runningTasks.values();
return ImmutableList.copyOf(runningTasks.values());
}
@Override
public Collection<RemoteTaskRunnerWorkItem> getPendingTasks()
{
return pendingTasks.values();
return ImmutableList.copyOf(pendingTasks.values());
}
@Override
public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
{
// Racey, since there is a period of time during assignment when a task is neither pending nor running
return Lists.newArrayList(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
}
public ZkWorker findWorkerRunningTask(String taskId)

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
@ -109,6 +110,11 @@ public class TaskLockbox
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs;
final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
uniqueTaskIds.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
@ -205,6 +211,7 @@ public class TaskLockbox
giant.lock();
try {
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse;

View File

@ -28,12 +28,10 @@ import java.util.List;
public class AutoScalingData
{
private final List<String> nodeIds;
private final List nodes;
public AutoScalingData(List<String> nodeIds, List nodes)
public AutoScalingData(List<String> nodeIds)
{
this.nodeIds = nodeIds;
this.nodes = nodes;
}
@JsonProperty
@ -42,17 +40,11 @@ public class AutoScalingData
return nodeIds;
}
public List getNodes()
{
return nodes;
}
@Override
public String toString()
{
return "AutoScalingData{" +
"nodeIds=" + nodeIds +
", nodes=" + nodes +
'}';
}
}

View File

@ -125,8 +125,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return input.getInstanceId();
}
}
),
result.getReservation().getInstances()
)
);
}
catch (Exception e) {
@ -140,7 +139,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
public AutoScalingData terminate(List<String> ips)
{
if (ips.isEmpty()) {
return new AutoScalingData(Lists.<String>newArrayList(), Lists.<Instance>newArrayList());
return new AutoScalingData(Lists.<String>newArrayList());
}
DescribeInstancesResult result = amazonEC2Client.describeInstances(
@ -184,8 +183,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return String.format("%s:%s", input, config.getWorkerPort());
}
}
),
instances
)
);
}
catch (Exception e) {

View File

@ -20,13 +20,16 @@
package io.druid.indexing.overlord.scaling;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
@ -38,7 +41,6 @@ import org.joda.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/**
*/
@ -48,211 +50,194 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupdDataRef;
private final Supplier<WorkerSetupData> workerSetupDataRef;
private final ScalingStats scalingStats;
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>();
private final Object lock = new Object();
private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private volatile DateTime lastProvisionTime = new DateTime();
private volatile DateTime lastTerminateTime = new DateTime();
private int targetWorkerCount = -1;
private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
@Inject
public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupdDataRef
Supplier<WorkerSetupData> workerSetupDataRef
)
{
this.autoScalingStrategy = autoScalingStrategy;
this.config = config;
this.workerSetupdDataRef = workerSetupdDataRef;
this.workerSetupDataRef = workerSetupDataRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
}
@Override
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
final WorkerSetupData workerSetupData = workerSetupdDataRef.get();
final String minVersion = workerSetupData.getMinVersion() == null
? config.getWorkerVersion()
: workerSetupData.getMinVersion();
int maxNumWorkers = workerSetupData.getMaxNumWorkers();
int currValidWorkers = 0;
for (ZkWorker zkWorker : zkWorkers) {
if (zkWorker.isValidVersion(minVersion)) {
currValidWorkers++;
synchronized (lock) {
boolean didProvision = false;
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (workerSetupData == null) {
log.warn("No workerSetupData available, cannot provision new workers.");
return false;
}
}
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
if (currValidWorkers >= maxNumWorkers) {
log.debug(
"Cannot scale anymore. Num workers = %d, Max num workers = %d",
zkWorkers.size(),
workerSetupdDataRef.get().getMaxNumWorkers()
);
return false;
}
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.<ZkWorker, String>transform(
zkWorkers,
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.<ZkWorker, String>transform(
zkWorkers,
new Function<ZkWorker, String>()
{
return input.getWorker().getIp();
@Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
}
}
)
)
);
)
)
);
currentlyProvisioning.removeAll(workerNodeIds);
currentlyProvisioning.removeAll(workerNodeIds);
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
if (nothingProvisioning) {
if (hasTaskPendingBeyondThreshold(pendingTasks)) {
AutoScalingData provisioned = autoScalingStrategy.provision();
if (provisioned != null) {
currentlyProvisioning.addAll(provisioned.getNodeIds());
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
while (want > 0) {
final AutoScalingData provisioned = autoScalingStrategy.provision();
final List<String> newNodes;
if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
break;
} else {
currentlyProvisioning.addAll(newNodes);
lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned);
return true;
want -= provisioned.getNodeIds().size();
didProvision = true;
}
}
} else {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info(
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s",
currentlyProvisioning,
durSinceLastProvision
);
if (!currentlyProvisioning.isEmpty()) {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
autoScalingStrategy.terminate(nodeIps);
currentlyProvisioning.clear();
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
autoScalingStrategy.terminate(nodeIps);
currentlyProvisioning.clear();
}
}
}
return false;
return didProvision;
}
}
@Override
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{
Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
zkWorkers,
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
synchronized (lock) {
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (workerSetupData == null) {
log.warn("No workerSetupData available, cannot terminate workers.");
return false;
}
boolean didTerminate = false;
final Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
zkWorkers,
new Function<ZkWorker, String>()
{
return input.getWorker().getIp();
@Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
}
}
)
)
)
);
Set<String> stillExisting = Sets.newHashSet();
for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
}
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
boolean nothingTerminating = currentlyTerminating.isEmpty();
if (nothingTerminating) {
final int minNumWorkers = workerSetupdDataRef.get().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) {
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
return false;
}
List<ZkWorker> thoseLazyWorkers = Lists.newArrayList(
FunctionalIterable
.create(zkWorkers)
.filter(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
}
}
)
)
);
int maxPossibleNodesTerminated = zkWorkers.size() - minNumWorkers;
int numNodesToTerminate = Math.min(maxPossibleNodesTerminated, thoseLazyWorkers.size());
if (numNodesToTerminate <= 0) {
log.info("Found no nodes to terminate.");
return false;
}
AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(0, numNodesToTerminate),
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
}
)
);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
scalingStats.addTerminateEvent(terminated);
return true;
final Set<String> stillExisting = Sets.newHashSet();
for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
}
} else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
log.info(
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
if (currentlyTerminating.isEmpty()) {
final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount;
if (excessWorkers > 0) {
final List<String> laziestWorkerIps =
FluentIterable.from(zkWorkers)
.filter(isLazyWorker)
.limit(excessWorkers)
.transform(
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker zkWorker)
{
return zkWorker.getWorker().getIp();
}
}
)
.toList();
currentlyTerminating.clear();
if (laziestWorkerIps.isEmpty()) {
log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!", excessWorkers);
} else {
log.info(
"Terminating %,d workers (wanted %,d): %s",
laziestWorkerIps.size(),
excessWorkers,
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = autoScalingStrategy.terminate(laziestWorkerIps);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
scalingStats.addTerminateEvent(terminated);
didTerminate = true;
}
}
}
} else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
currentlyTerminating.clear();
}
}
return didTerminate;
}
return false;
}
@Override
@ -261,16 +246,128 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats;
}
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
private static Predicate<ZkWorker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
)
{
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
return true;
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
return new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker));
}
};
}
private static Predicate<ZkWorker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
)
{
return new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker zkWorker)
{
final String minVersion = workerSetupData.getMinVersion() != null
? workerSetupData.getMinVersion()
: config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
return zkWorker.isValidVersion(minVersion);
}
};
}
private void updateTargetWorkerCount(
final WorkerSetupData workerSetupData,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
)
{
synchronized (lock) {
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config, workerSetupData)
);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
if (targetWorkerCount < 0) {
// Initialize to size of current worker pool, subject to pool size limits
targetWorkerCount = Math.max(
Math.min(
zkWorkers.size(),
workerSetupData.getMaxNumWorkers()
),
workerSetupData.getMinNumWorkers()
);
log.info(
"Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
}
final boolean atSteadyState = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty()
&& validWorkers.size() == targetWorkerCount;
final boolean shouldScaleUp = atSteadyState
&& hasTaskPendingBeyondThreshold(pendingTasks)
&& targetWorkerCount < workerSetupData.getMaxNumWorkers();
final boolean shouldScaleDown = atSteadyState
&& Iterables.any(validWorkers, isLazyWorker)
&& targetWorkerCount > workerSetupData.getMinNumWorkers();
if (shouldScaleUp) {
targetWorkerCount++;
log.info(
"I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} else if (shouldScaleDown) {
targetWorkerCount--;
log.info(
"I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} else {
log.info(
"Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
}
}
return false;
}
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
{
synchronized (lock) {
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
return true;
}
}
return false;
}
}
}

View File

@ -44,7 +44,7 @@ public class TestMergeTask extends MergeTask
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new Interval(new DateTime(), new DateTime().plus(1)),
new DateTime().toString(),
null,
null,

View File

@ -126,13 +126,11 @@ public class EC2AutoScalingStrategyTest
AutoScalingData created = strategy.provision();
Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(deleted.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0));
}
}

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord.scaling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -28,6 +29,7 @@ import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker;
@ -63,7 +65,7 @@ public class SimpleResourceManagementStrategyTest
public void setUp() throws Exception
{
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<WorkerSetupData>(
workerSetupData = new AtomicReference<>(
new WorkerSetupData(
"0", 0, 2, null, null, null
)
@ -105,7 +107,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList())
new AutoScalingData(Lists.<String>newArrayList("aNode"))
);
EasyMock.replay(autoScalingStrategy);
@ -133,7 +135,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScalingStrategy);
@ -190,7 +192,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject()))
.andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker"))
new AutoScalingData(Lists.<String>newArrayList("fake"))
);
EasyMock.replay(autoScalingStrategy);
@ -242,7 +244,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList())
new AutoScalingData(Lists.<String>newArrayList())
);
EasyMock.replay(autoScalingStrategy);
@ -272,7 +274,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip"), Lists.newArrayList("ip"))
new AutoScalingData(Lists.<String>newArrayList("ip"))
);
EasyMock.replay(autoScalingStrategy);
@ -309,15 +311,174 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNoActionNeeded() throws Exception
{
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testMinVersionIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minVersion
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
// Provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Terminate old workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
ImmutableList.of("h1", "h2", "h3", "h4")
);
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
new AutoScalingData(ImmutableList.of("h1", "h2"))
);
EasyMock.replay(autoScalingStrategy);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(null, "h1", "i1", "0"),
new TestZkWorker(null, "h2", "i2", "0"),
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
)
);
Assert.assertTrue(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNullWorkerSetupData() throws Exception
{
workerSetupData.set(null);
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
private static class TestZkWorker extends ZkWorker
{
private final Task testTask;
private TestZkWorker(
public TestZkWorker(
Task testTask
)
{
super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper());
this(testTask, "host", "ip", "0");
}
public TestZkWorker(
Task testTask,
String host,
String ip,
String version
)
{
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
this.testTask = testTask;
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
@ -73,7 +73,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.6</version>
<version>0.2.7</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -114,14 +114,18 @@ public class S3DataSegmentMover implements DataSegmentMover
public Void call() throws Exception
{
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
log.info(
"Moving file[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false);
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
} else {
log.info(
"Moving file[s3://%s/%s] to [s3://%s/%s]",
s3Bucket,
s3Path,
targetS3Bucket,
targetS3Path
);
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false);
}
} else {
// ensure object exists in target location
if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -84,6 +84,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -101,11 +102,26 @@ public class Initialization
"io.druid",
"com.metamx.druid"
);
private final static Map<Class, Set> extensionsMap = Maps.<Class, Set>newHashMap();
public synchronized static <T> List<T> getFromExtensions(ExtensionsConfig config, Class<T> clazz)
/**
* @param clazz Module class
* @param <T>
* @return Returns the set of modules loaded.
*/
public static<T> Set<T> getLoadedModules(Class<T> clazz)
{
Set<T> retVal = extensionsMap.get(clazz);
if (retVal == null) {
return Sets.newHashSet();
}
return retVal;
}
public synchronized static <T> Collection<T> getFromExtensions(ExtensionsConfig config, Class<T> clazz)
{
final TeslaAether aether = getAetherClient(config);
List<T> retVal = Lists.newArrayList();
Set<T> retVal = Sets.newHashSet();
if (config.searchCurrentClassloader()) {
for (T module : ServiceLoader.load(clazz, Initialization.class.getClassLoader())) {
@ -131,6 +147,9 @@ public class Initialization
}
}
// update the map with currently loaded modules
extensionsMap.put(clazz, retVal);
return retVal;
}

View File

@ -712,7 +712,7 @@ public class RealtimePlumberSchool implements PlumberSchool
handoffCondition.notifyAll();
}
}
catch (IOException e) {
catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.emit();

View File

@ -19,12 +19,17 @@
package io.druid.server;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
/**
*/
@ -35,28 +40,20 @@ public class StatusResource
@Produces("application/json")
public Status doGet()
{
return getStatus();
}
public static Status getStatus()
{
return new Status(
Initialization.class.getPackage().getImplementationVersion(),
new Memory(Runtime.getRuntime())
);
return new Status();
}
public static class Status
{
final String version;
final List<ModuleVersion> modules;
final Memory memory;
public Status(
String version, Memory memory
)
public Status()
{
this.version = version;
this.memory = memory;
this.version = Status.class.getPackage().getImplementationVersion();
this.modules = getExtensionVersions();
this.memory = new Memory(Runtime.getRuntime());
}
@JsonProperty
@ -65,6 +62,12 @@ public class StatusResource
return version;
}
@JsonProperty
public List<ModuleVersion> getModules()
{
return modules;
}
@JsonProperty
public Memory getMemory()
{
@ -75,21 +78,48 @@ public class StatusResource
public String toString()
{
final String NL = System.getProperty("line.separator");
return String.format("Druid version - %s", version) + NL;
StringBuilder output = new StringBuilder();
output.append(String.format("Druid version - %s", version)).append(NL).append(NL);
if (modules.size() > 0) {
output.append("Registered Druid Modules").append(NL);
} else {
output.append("No Druid Modules loaded !");
}
for (ModuleVersion moduleVersion : modules) {
output.append(moduleVersion).append(NL);
}
return output.toString();
}
/**
* Load the unique extensions and return their implementation-versions
*
* @return map of extensions loaded with their respective implementation versions.
*/
private List<ModuleVersion> getExtensionVersions()
{
final Set<DruidModule> druidModules = Initialization.getLoadedModules(DruidModule.class);
List<ModuleVersion> moduleVersions = new ArrayList<>();
for (DruidModule module : druidModules) {
String artifact = module.getClass().getPackage().getImplementationTitle();
String version = module.getClass().getPackage().getImplementationVersion();
moduleVersions.add(new ModuleVersion(module.getClass().getCanonicalName(), artifact, version));
}
return moduleVersions;
}
}
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class ModuleVersion
{
final String name;
final String artifact;
final String version;
public ModuleVersion(String name)
{
this(name, "", "");
}
public ModuleVersion(String name, String artifact, String version)
{
this.name = name;
@ -118,7 +148,7 @@ public class StatusResource
@Override
public String toString()
{
if (artifact.isEmpty()) {
if (artifact == null || artifact.isEmpty()) {
return String.format(" - %s ", name);
} else {
return String.format(" - %s (%s-%s)", name, artifact, version);
@ -164,6 +194,5 @@ public class StatusResource
{
return usedMemory;
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import static io.druid.server.StatusResource.ModuleVersion;
/**
*/
public class StatusResourceTest
{
private Collection<DruidModule> loadTestModule()
{
Injector baseInjector = Initialization.makeStartupInjector();
return Initialization.getFromExtensions(baseInjector.getInstance(ExtensionsConfig.class), DruidModule.class);
}
@Test
public void testLoadedModules()
{
final StatusResource resource = new StatusResource();
List<ModuleVersion> statusResourceModuleList;
statusResourceModuleList = resource.doGet().getModules();
Assert.assertEquals(
"No Modules should be loaded currently! " + statusResourceModuleList,
statusResourceModuleList.size(), 0
);
Collection<DruidModule> modules = loadTestModule();
statusResourceModuleList = resource.doGet().getModules();
Assert.assertEquals("Status should have all modules loaded!", statusResourceModuleList.size(), modules.size());
for (DruidModule module : modules) {
String moduleName = module.getClass().getCanonicalName();
boolean contains = Boolean.FALSE;
for (ModuleVersion version : statusResourceModuleList) {
if (version.getName().equals(moduleName)) {
contains = Boolean.TRUE;
}
}
Assert.assertTrue("Status resource should contain module " + moduleName, contains);
}
/*
* StatusResource only uses Initialization.getLoadedModules
*/
for (int i = 0; i < 5; i++) {
Set<DruidModule> loadedModules = Initialization.getLoadedModules(DruidModule.class);
Assert.assertEquals("Set from loaded module should be same!", loadedModules, modules);
}
}
public static class TestDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
// Do nothing
}
}
}

View File

@ -0,0 +1 @@
io.druid.server.StatusResourceTest$TestDruidModule

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.42-SNAPSHOT</version>
<version>0.6.47-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -53,7 +53,7 @@ import java.util.List;
*/
@Command(
name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.40/Broker.html for a description"
description = "Runs a broker node, see http://druid.io/docs/0.6.46/Broker.html for a description"
)
public class CliBroker extends ServerRunnable
{

View File

@ -60,7 +60,7 @@ import java.util.List;
*/
@Command(
name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.40/Coordinator.html for a description."
description = "Runs the Coordinator, see http://druid.io/docs/0.6.46/Coordinator.html for a description."
)
public class CliCoordinator extends ServerRunnable
{

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.40/Batch-ingestion.html for a description."
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.46/Batch-ingestion.html for a description."
)
public class CliHadoopIndexer implements Runnable
{

View File

@ -42,7 +42,7 @@ import java.util.List;
*/
@Command(
name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.40/Historical.html for a description"
description = "Runs a Historical node, see http://druid.io/docs/0.6.46/Historical.html for a description"
)
public class CliHistorical extends ServerRunnable
{

View File

@ -93,7 +93,7 @@ import java.util.List;
*/
@Command(
name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.40/Indexing-Service.html for a description"
description = "Runs an Overlord node, see http://druid.io/docs/0.6.46/Indexing-Service.html for a description"
)
public class CliOverlord extends ServerRunnable
{

View File

@ -30,7 +30,7 @@ import java.util.List;
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.40/Realtime.html for a description"
description = "Runs a realtime node, see http://druid.io/docs/0.6.46/Realtime.html for a description"
)
public class CliRealtime extends ServerRunnable
{

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/
@Command(
name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.40/Realtime.html for a description"
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.46/Realtime.html for a description"
)
public class CliRealtimeExample extends ServerRunnable
{

View File

@ -30,6 +30,7 @@ import io.druid.server.initialization.ExtensionsConfig;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import java.util.Collection;
import java.util.List;
/**
@ -75,7 +76,7 @@ public class Main
final Injector injector = Initialization.makeStartupInjector();
final ExtensionsConfig config = injector.getInstance(ExtensionsConfig.class);
final List<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);
final Collection<CliCommandCreator> extensionCommands = Initialization.getFromExtensions(config, CliCommandCreator.class);
for (CliCommandCreator creator : extensionCommands) {
creator.addCommands(builder);

View File

@ -31,6 +31,6 @@ public class Version implements Runnable
@Override
public void run()
{
System.out.println(StatusResource.getStatus());
System.out.println(new StatusResource.Status());
}
}