Merge branch 'master' into az

This commit is contained in:
fjy 2014-01-02 10:53:37 -08:00
commit 488a118f3f
68 changed files with 870 additions and 595 deletions

View File

@ -30,4 +30,4 @@ echo "For examples, see: "
echo " " echo " "
ls -1 examples/*/*sh ls -1 examples/*/*sh
echo " " echo " "
echo "See also http://druid.io/docs/0.6.40" echo "See also http://druid.io/docs/0.6.46"

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -59,14 +59,6 @@
<groupId>org.skife.config</groupId> <groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId> <artifactId>config-magic</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.hibernate</groupId> <groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId> <artifactId>hibernate-validator</artifactId>
@ -75,10 +67,6 @@
<groupId>javax.validation</groupId> <groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId> <artifactId>validation-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
@ -127,16 +115,6 @@
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
</dependency> </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<dependency>
<groupId>org.mozilla</groupId>
<artifactId>rhino</artifactId>
<version>1.7R4</version>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>

View File

@ -3,7 +3,7 @@ layout: doc_page
--- ---
# Booting a Single Node Cluster # # Booting a Single Node Cluster #
[Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.40-bin.tar.gz). [Loading Your Data](Tutorial%3A-Loading-Your-Data-Part-2.html) and [All About Queries](Tutorial%3A-All-About-Queries.html) contain recipes to boot a small druid cluster on localhost. Here we will boot a small cluster on EC2. You can checkout the code, or download a tarball from [here](http://static.druid.io/artifacts/druid-services-0.6.46-bin.tar.gz).
The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables: The [ec2 run script](https://github.com/metamx/druid/blob/master/examples/bin/run_ec2.sh), run_ec2.sh, is located at 'examples/bin' if you have checked out the code, or at the root of the project if you've downloaded a tarball. The scripts rely on the [Amazon EC2 API Tools](http://aws.amazon.com/developertools/351), and you will need to set three environment variables:

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid git clone https://github.com/metamx/druid.git druid
cd druid cd druid
git fetch --tags git fetch --tags
git checkout druid-0.6.40 git checkout druid-0.6.46
./build.sh ./build.sh
``` ```
### Downloading the DSK (Druid Standalone Kit) ### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) a stand-alone tarball and run it: [Download](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz) a stand-alone tarball and run it:
``` bash ``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime druid.service=realtime
druid.port=8083 druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.40"] druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.46"]
druid.zk.service.host=localhost druid.zk.service.host=localhost

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball ### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz). Download this file to a directory of your choosing. We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.40 cd druid-services-0.6.46
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -44,7 +44,7 @@ With real-world data, we recommend having a message bus such as [Apache Kafka](h
#### Setting up Kafka #### Setting up Kafka
[KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.40/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node. [KafkaFirehoseFactory](https://github.com/metamx/druid/blob/druid-0.6.46/realtime/src/main/java/com/metamx/druid/realtime/firehose/KafkaFirehoseFactory.java) is how druid communicates with Kafka. Using this [Firehose](Firehose.html) with the right configuration, we can import data into Druid in real-time without writing any code. To load data to a real-time node via Kafka, we'll first need to initialize Zookeeper and Kafka, and then configure and initialize a [Realtime](Realtime.html) node.
Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html). Instructions for booting a Zookeeper and then Kafka cluster are available [here](http://kafka.apache.org/07/quickstart.html).

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes as well as and exter
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first. If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
and untar the contents within by issuing: and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -238,7 +238,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz) We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz)
Download this file to a directory of your choosing. Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory: Not too lost so far right? That's great! If you cd into the directory:
``` ```
cd druid-services-0.6.40 cd druid-services-0.6.46
``` ```
You should see a bunch of files: You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
h3. Download a Tarball h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.40-bin.tar.gz. We've built a tarball that contains everything you'll need. You'll find it "here":http://static.druid.io/artifacts/releases/druid-services-0.6.46-bin.tar.gz.
Download this bad boy to a directory of your choosing. Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing: You can extract the awesomeness within by issuing:

View File

@ -4,7 +4,7 @@ druid.port=8081
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.40"] druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.46"]
# Dummy read only AWS account (used to download example data) # Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b

View File

@ -4,7 +4,7 @@ druid.port=8083
druid.zk.service.host=localhost druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.40","io.druid.extensions:druid-kafka-seven:0.6.40","io.druid.extensions:druid-rabbitmq:0.6.40"] druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.46","io.druid.extensions:druid-kafka-seven:0.6.46","io.druid.extensions:druid-rabbitmq:0.6.46"]
# Change this config to db to hand off to the rest of the Druid cluster # Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop druid.publish.type=noop

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -47,95 +47,10 @@
<artifactId>druid-indexing-hadoop</artifactId> <artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>mysql</groupId>
<artifactId>emitter</artifactId> <artifactId>mysql-connector-java</artifactId>
</dependency> <version>5.1.18</version>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency> </dependency>
<!-- Tests --> <!-- Tests -->

View File

@ -20,10 +20,15 @@
package io.druid.indexing.common; package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskConfig;
@ -37,10 +42,14 @@ import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -167,7 +176,7 @@ public class TaskToolbox
return objectMapper; return objectMapper;
} }
public Map<DataSegment, File> getSegments(List<DataSegment> segments) public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
throws SegmentLoadingException throws SegmentLoadingException
{ {
Map<DataSegment, File> retVal = Maps.newLinkedHashMap(); Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
@ -178,6 +187,25 @@ public class TaskToolbox
return retVal; return retVal;
} }
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
}
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
}
}
public File getTaskWorkDir() public File getTaskWorkDir()
{ {
return taskWorkDir; return taskWorkDir;

View File

@ -19,6 +19,7 @@
package io.druid.indexing.common.actions; package io.druid.indexing.common.actions;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.TaskStorage;
@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient
{ {
log.info("Performing action for task[%s]: %s", task.getId(), taskAction); log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
final RetType ret = taskAction.perform(task, toolbox);
if (taskAction.isAudited()) { if (taskAction.isAudited()) {
// Add audit log // Add audit log
try { try {
storage.addAuditLog(task, taskAction); storage.addAuditLog(task, taskAction);
} }
catch (Exception e) { catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log") log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId()) .addData("task", task.getId())
.addData("actionClass", taskAction.getClass().getName()) .addData("actionClass", actionClass)
.emit(); .emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
} }
} }
return ret; return taskAction.perform(task, toolbox);
} }
} }

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
@Override @Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{ {
if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) { toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
}
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments); final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);

View File

@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -42,10 +41,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
Task task, TaskActionToolbox toolbox Task task, TaskActionToolbox toolbox
) throws IOException ) throws IOException
{ {
if(!toolbox.taskLockCoversSegments(task, segments, true)) { toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments); toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
// Emit metrics // Emit metrics

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override @Override
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{ {
if(!toolbox.taskLockCoversSegments(task, segments, true)) { toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.getIndexerDBCoordinator().deleteSegments(segments); toolbox.getIndexerDBCoordinator().deleteSegments(segments);
// Emit metrics // Emit metrics

View File

@ -19,9 +19,11 @@
package io.druid.indexing.common.actions; package io.druid.indexing.common.actions;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -65,6 +67,38 @@ public class TaskActionToolbox
return emitter; return emitter;
} }
public boolean segmentsAreFromSamePartitionSet(
final Set<DataSegment> segments
)
{
// Verify that these segments are all in the same partition set
Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
final DataSegment firstSegment = segments.iterator().next();
for (final DataSegment segment : segments) {
if (!segment.getDataSource().equals(firstSegment.getDataSource())
|| !segment.getInterval().equals(firstSegment.getInterval())
|| !segment.getVersion().equals(firstSegment.getVersion())) {
return false;
}
}
return true;
}
public void verifyTaskLocksAndSinglePartitionSettitude(
final Task task,
final Set<DataSegment> segments,
final boolean allowOlderVersions
)
{
if (!taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
if (!segmentsAreFromSamePartitionSet(segments)) {
throw new ISE("Segments are not in the same partition set: %s", segments);
}
}
public boolean taskLockCoversSegments( public boolean taskLockCoversSegments(
final Task task, final Task task,
final Set<DataSegment> segments, final Set<DataSegment> segments,

View File

@ -21,6 +21,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -58,7 +59,8 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
) )
{ {
super(id, groupId, taskResource, dataSource); super(id, groupId, taskResource, dataSource);
this.interval = interval; this.interval = Preconditions.checkNotNull(interval, "interval");
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
} }
@Override @Override

View File

@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -103,7 +104,7 @@ public class DeleteTask extends AbstractFixedIntervalTask
segment.getVersion() segment.getVersion()
); );
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists; import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
@ -47,12 +51,15 @@ import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.URL; import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
public class HadoopIndexTask extends AbstractFixedIntervalTask public class HadoopIndexTask extends AbstractFixedIntervalTask
{ {
@ -180,14 +187,10 @@ public class HadoopIndexTask extends AbstractFixedIntervalTask
if (segments != null) { if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue( List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, new TypeReference<List<DataSegment>>() segments,
{ new TypeReference<List<DataSegment>>() {}
}
); );
// Request segment pushes toolbox.pushSegments(publishedSegments);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} else { } else {
return TaskStatus.failure(getId()); return TaskStatus.failure(getId());

View File

@ -156,7 +156,7 @@ public class IndexTask extends AbstractFixedIntervalTask
segments.add(segment); segments.add(segment);
} }
} }
toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments)); toolbox.pushSegments(segments);
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -97,11 +97,9 @@ public class KillTask extends AbstractFixedIntervalTask
// Kill segments // Kill segments
for (DataSegment segment : unusedSegments) { for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment); toolbox.getDataSegmentKiller().kill(segment);
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
} }
// Remove metadata for these segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
} }

View File

@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -142,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
); );
// download segments to merge // download segments to merge
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments); final Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(segments);
// merge files together // merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged")); final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
@ -165,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -98,18 +98,12 @@ public class MoveTask extends AbstractFixedIntervalTask
log.info("OK to move segment: %s", unusedSegment.getIdentifier()); log.info("OK to move segment: %s", unusedSegment.getIdentifier());
} }
List<DataSegment> movedSegments = Lists.newLinkedList();
// Move segments // Move segments
for (DataSegment segment : unusedSegments) { for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec)); final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
} }
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(movedSegments)
));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -35,9 +35,9 @@ import org.joda.time.DateTime;
public class NoopTask extends AbstractTask public class NoopTask extends AbstractTask
{ {
private static final Logger log = new Logger(NoopTask.class); private static final Logger log = new Logger(NoopTask.class);
private static int defaultRunTime = 2500; private static final int defaultRunTime = 2500;
private static int defaultIsReadyTime = 0; private static final int defaultIsReadyTime = 0;
private static IsReadyResult defaultIsReadyResult = IsReadyResult.YES; private static final IsReadyResult defaultIsReadyResult = IsReadyResult.YES;
enum IsReadyResult enum IsReadyResult
{ {
@ -139,4 +139,9 @@ public class NoopTask extends AbstractTask
log.info("Woke up!"); log.info("Woke up!");
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
public static NoopTask create()
{
return new NoopTask(null, 0, 0, null, null);
}
} }

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.exception.FormattedException; import com.metamx.common.exception.FormattedException;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -35,9 +34,7 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockReleaseAction; import io.druid.indexing.common.actions.LockReleaseAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query; import io.druid.query.Query;
@ -212,7 +209,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override @Override
public void announceSegment(final DataSegment segment) throws IOException public void announceSegment(final DataSegment segment) throws IOException
{ {
// NOTE: Side effect: Calling announceSegment causes a lock to be acquired // Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getSegmentAnnouncer().announceSegment(segment); toolbox.getSegmentAnnouncer().announceSegment(segment);
} }
@ -231,6 +228,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override @Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException public void announceSegments(Iterable<DataSegment> segments) throws IOException
{ {
// Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
} }
@ -263,7 +261,7 @@ public class RealtimeIndexTask extends AbstractTask
public String getVersion(final Interval interval) public String getVersion(final Interval interval)
{ {
try { try {
// NOTE: Side effect: Calling getVersion causes a lock to be acquired // Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient() final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval)); .submit(new LockAcquireAction(interval));
@ -418,7 +416,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override @Override
public void publishSegment(DataSegment segment) throws IOException public void publishSegment(DataSegment segment) throws IOException
{ {
taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); taskToolbox.pushSegments(ImmutableList.of(segment));
} }
} }
} }

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -248,7 +249,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
} }
} }
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment)); final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Arrays.asList(segment));
final File location = localSegments.get(segment); final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out"); final File outLocation = new File(location, "v9_out");

View File

@ -489,7 +489,18 @@ public class DbTaskStorage implements TaskStorage
final Map<Long, TaskLock> retMap = Maps.newHashMap(); final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) { for (final Map<String, Object> row : dbTaskLocks) {
retMap.put((Long) row.get("id"), jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)); try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
} }
return retMap; return retMap;
} }

View File

@ -42,6 +42,7 @@ import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException; import java.io.IOException;
@ -169,39 +170,39 @@ public class IndexerDBCoordinator
private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException
{ {
try { try {
final List<Map<String, Object>> exists = handle.createQuery( if (segmentExists(handle, segment)) {
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
return false; return false;
} }
handle.createStatement( // Try/catch to work around races due to SELECT -> INSERT. Avoid ON DUPLICATE KEY since it's not portable.
String.format( try {
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", handle.createStatement(
dbTables.getSegmentsTable() String.format(
) "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
) + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
.bind("id", segment.getIdentifier()) dbTables.getSegmentsTable()
.bind("dataSource", segment.getDataSource()) )
.bind("created_date", new DateTime().toString()) )
.bind("start", segment.getInterval().getStart().toString()) .bind("id", segment.getIdentifier())
.bind("end", segment.getInterval().getEnd().toString()) .bind("dataSource", segment.getDataSource())
.bind("partitioned", segment.getShardSpec().getPartitionNum()) .bind("created_date", new DateTime().toString())
.bind("version", segment.getVersion()) .bind("start", segment.getInterval().getStart().toString())
.bind("used", true) .bind("end", segment.getInterval().getEnd().toString())
.bind("payload", jsonMapper.writeValueAsString(segment)) .bind("partitioned", segment.getShardSpec().getPartitionNum())
.execute(); .bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
throw e;
}
}
} }
catch (IOException e) { catch (IOException e) {
log.error(e, "Exception inserting into DB"); log.error(e, "Exception inserting into DB");
@ -211,6 +212,20 @@ public class IndexerDBCoordinator
return true; return true;
} }
private boolean segmentExists(final Handle handle, final DataSegment segment) {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
return !exists.isEmpty();
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{ {
dbi.inTransaction( dbi.inTransaction(

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -251,26 +252,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override @Override
public Collection<ZkWorker> getWorkers() public Collection<ZkWorker> getWorkers()
{ {
return zkWorkers.values(); return ImmutableList.copyOf(zkWorkers.values());
} }
@Override @Override
public Collection<RemoteTaskRunnerWorkItem> getRunningTasks() public Collection<RemoteTaskRunnerWorkItem> getRunningTasks()
{ {
return runningTasks.values(); return ImmutableList.copyOf(runningTasks.values());
} }
@Override @Override
public Collection<RemoteTaskRunnerWorkItem> getPendingTasks() public Collection<RemoteTaskRunnerWorkItem> getPendingTasks()
{ {
return pendingTasks.values(); return ImmutableList.copyOf(pendingTasks.values());
} }
@Override @Override
public Collection<RemoteTaskRunnerWorkItem> getKnownTasks() public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
{ {
// Racey, since there is a period of time during assignment when a task is neither pending nor running // Racey, since there is a period of time during assignment when a task is neither pending nor running
return Lists.newArrayList(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
} }
public ZkWorker findWorkerRunningTask(String taskId) public ZkWorker findWorkerRunningTask(String taskId)

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Optional; import com.google.common.base.Optional;
@ -109,6 +110,11 @@ public class TaskLockbox
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs; final Task task = taskAndLock.lhs;
final TaskLock savedTaskLock = taskAndLock.rhs; final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
uniqueTaskIds.add(task.getId()); uniqueTaskIds.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock( final Optional<TaskLock> acquiredTaskLock = tryLock(
task, task,
@ -205,6 +211,7 @@ public class TaskLockbox
giant.lock(); giant.lock();
try { try {
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final String dataSource = task.getDataSource(); final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval); final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse; final TaskLockPosse posseToUse;

View File

@ -28,12 +28,10 @@ import java.util.List;
public class AutoScalingData public class AutoScalingData
{ {
private final List<String> nodeIds; private final List<String> nodeIds;
private final List nodes;
public AutoScalingData(List<String> nodeIds, List nodes) public AutoScalingData(List<String> nodeIds)
{ {
this.nodeIds = nodeIds; this.nodeIds = nodeIds;
this.nodes = nodes;
} }
@JsonProperty @JsonProperty
@ -42,17 +40,11 @@ public class AutoScalingData
return nodeIds; return nodeIds;
} }
public List getNodes()
{
return nodes;
}
@Override @Override
public String toString() public String toString()
{ {
return "AutoScalingData{" + return "AutoScalingData{" +
"nodeIds=" + nodeIds + "nodeIds=" + nodeIds +
", nodes=" + nodes +
'}'; '}';
} }
} }

View File

@ -125,8 +125,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return input.getInstanceId(); return input.getInstanceId();
} }
} }
), )
result.getReservation().getInstances()
); );
} }
catch (Exception e) { catch (Exception e) {
@ -140,7 +139,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
public AutoScalingData terminate(List<String> ips) public AutoScalingData terminate(List<String> ips)
{ {
if (ips.isEmpty()) { if (ips.isEmpty()) {
return new AutoScalingData(Lists.<String>newArrayList(), Lists.<Instance>newArrayList()); return new AutoScalingData(Lists.<String>newArrayList());
} }
DescribeInstancesResult result = amazonEC2Client.describeInstances( DescribeInstancesResult result = amazonEC2Client.describeInstances(
@ -184,8 +183,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return String.format("%s:%s", input, config.getWorkerPort()); return String.format("%s:%s", input, config.getWorkerPort());
} }
} }
), )
instances
); );
} }
catch (Exception e) { catch (Exception e) {

View File

@ -20,13 +20,16 @@
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.scaling;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
@ -38,7 +41,6 @@ import org.joda.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/** /**
*/ */
@ -48,211 +50,194 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private final AutoScalingStrategy autoScalingStrategy; private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagementConfig config; private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupdDataRef; private final Supplier<WorkerSetupData> workerSetupDataRef;
private final ScalingStats scalingStats; private final ScalingStats scalingStats;
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>(); private final Object lock = new Object();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>(); private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private volatile DateTime lastProvisionTime = new DateTime(); private int targetWorkerCount = -1;
private volatile DateTime lastTerminateTime = new DateTime(); private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
@Inject @Inject
public SimpleResourceManagementStrategy( public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy, AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagementConfig config, SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupdDataRef Supplier<WorkerSetupData> workerSetupDataRef
) )
{ {
this.autoScalingStrategy = autoScalingStrategy; this.autoScalingStrategy = autoScalingStrategy;
this.config = config; this.config = config;
this.workerSetupdDataRef = workerSetupdDataRef; this.workerSetupDataRef = workerSetupDataRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
} }
@Override @Override
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
final WorkerSetupData workerSetupData = workerSetupdDataRef.get(); synchronized (lock) {
boolean didProvision = false;
final String minVersion = workerSetupData.getMinVersion() == null final WorkerSetupData workerSetupData = workerSetupDataRef.get();
? config.getWorkerVersion() if (workerSetupData == null) {
: workerSetupData.getMinVersion(); log.warn("No workerSetupData available, cannot provision new workers.");
int maxNumWorkers = workerSetupData.getMaxNumWorkers(); return false;
int currValidWorkers = 0;
for (ZkWorker zkWorker : zkWorkers) {
if (zkWorker.isValidVersion(minVersion)) {
currValidWorkers++;
} }
} final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
if (currValidWorkers >= maxNumWorkers) { final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
log.debug( Lists.newArrayList(
"Cannot scale anymore. Num workers = %d, Max num workers = %d", Iterables.<ZkWorker, String>transform(
zkWorkers.size(), zkWorkers,
workerSetupdDataRef.get().getMaxNumWorkers() new Function<ZkWorker, String>()
);
return false;
}
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.<ZkWorker, String>transform(
zkWorkers,
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
{ {
return input.getWorker().getIp(); @Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
} }
} )
) )
) );
); currentlyProvisioning.removeAll(workerNodeIds);
currentlyProvisioning.removeAll(workerNodeIds); updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
if (nothingProvisioning) { int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
if (hasTaskPendingBeyondThreshold(pendingTasks)) { while (want > 0) {
AutoScalingData provisioned = autoScalingStrategy.provision(); final AutoScalingData provisioned = autoScalingStrategy.provision();
final List<String> newNodes;
if (provisioned != null) { if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
currentlyProvisioning.addAll(provisioned.getNodeIds()); break;
} else {
currentlyProvisioning.addAll(newNodes);
lastProvisionTime = new DateTime(); lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned); scalingStats.addProvisionEvent(provisioned);
want -= provisioned.getNodeIds().size();
return true; didProvision = true;
} }
} }
} else {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info( if (!currentlyProvisioning.isEmpty()) {
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s", Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
currentlyProvisioning,
durSinceLastProvision
);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning)); if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
autoScalingStrategy.terminate(nodeIps); log.makeAlert("Worker node provisioning taking too long!")
currentlyProvisioning.clear(); .addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
autoScalingStrategy.terminate(nodeIps);
currentlyProvisioning.clear();
}
} }
}
return false; return didProvision;
}
} }
@Override @Override
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
Set<String> workerNodeIds = Sets.newHashSet( synchronized (lock) {
autoScalingStrategy.ipToIdLookup( final WorkerSetupData workerSetupData = workerSetupDataRef.get();
Lists.newArrayList( if (workerSetupData == null) {
Iterables.transform( log.warn("No workerSetupData available, cannot terminate workers.");
zkWorkers, return false;
new Function<ZkWorker, String>() }
{
@Override boolean didTerminate = false;
public String apply(ZkWorker input) final Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
zkWorkers,
new Function<ZkWorker, String>()
{ {
return input.getWorker().getIp(); @Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
} }
} )
)
)
)
);
Set<String> stillExisting = Sets.newHashSet();
for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
}
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
boolean nothingTerminating = currentlyTerminating.isEmpty();
if (nothingTerminating) {
final int minNumWorkers = workerSetupdDataRef.get().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) {
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
return false;
}
List<ZkWorker> thoseLazyWorkers = Lists.newArrayList(
FunctionalIterable
.create(zkWorkers)
.filter(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
}
}
) )
);
int maxPossibleNodesTerminated = zkWorkers.size() - minNumWorkers;
int numNodesToTerminate = Math.min(maxPossibleNodesTerminated, thoseLazyWorkers.size());
if (numNodesToTerminate <= 0) {
log.info("Found no nodes to terminate.");
return false;
}
AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(0, numNodesToTerminate),
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
}
) )
); );
if (terminated != null) { final Set<String> stillExisting = Sets.newHashSet();
currentlyTerminating.addAll(terminated.getNodeIds()); for (String s : currentlyTerminating) {
lastTerminateTime = new DateTime(); if (workerNodeIds.contains(s)) {
scalingStats.addTerminateEvent(terminated); stillExisting.add(s);
}
return true;
} }
} else { currentlyTerminating.clear();
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); currentlyTerminating.addAll(stillExisting);
log.info( updateTargetWorkerCount(workerSetupData, pendingTasks, zkWorkers);
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
log.makeAlert("Worker node termination taking too long!") if (currentlyTerminating.isEmpty()) {
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) final int excessWorkers = (zkWorkers.size() + currentlyProvisioning.size()) - targetWorkerCount;
.addData("terminatingCount", currentlyTerminating.size()) if (excessWorkers > 0) {
.emit(); final List<String> laziestWorkerIps =
FluentIterable.from(zkWorkers)
.filter(isLazyWorker)
.limit(excessWorkers)
.transform(
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker zkWorker)
{
return zkWorker.getWorker().getIp();
}
}
)
.toList();
currentlyTerminating.clear(); if (laziestWorkerIps.isEmpty()) {
log.info("Wanted to terminate %,d workers, but couldn't find any lazy ones!", excessWorkers);
} else {
log.info(
"Terminating %,d workers (wanted %,d): %s",
laziestWorkerIps.size(),
excessWorkers,
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = autoScalingStrategy.terminate(laziestWorkerIps);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
scalingStats.addTerminateEvent(terminated);
didTerminate = true;
}
}
}
} else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
currentlyTerminating.clear();
}
} }
return didTerminate;
} }
return false;
} }
@Override @Override
@ -261,16 +246,128 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats; return scalingStats;
} }
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks) private static Predicate<ZkWorker> createLazyWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
)
{ {
long now = System.currentTimeMillis(); final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(config, workerSetupData);
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now); return new Predicate<ZkWorker>()
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration(); {
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) { @Override
return true; public boolean apply(ZkWorker worker)
{
final boolean itHasBeenAWhile = System.currentTimeMillis() - worker.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
return worker.getRunningTasks().isEmpty() && (itHasBeenAWhile || !isValidWorker.apply(worker));
}
};
}
private static Predicate<ZkWorker> createValidWorkerPredicate(
final SimpleResourceManagementConfig config,
final WorkerSetupData workerSetupData
)
{
return new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker zkWorker)
{
final String minVersion = workerSetupData.getMinVersion() != null
? workerSetupData.getMinVersion()
: config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
return zkWorker.isValidVersion(minVersion);
}
};
}
private void updateTargetWorkerCount(
final WorkerSetupData workerSetupData,
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
)
{
synchronized (lock) {
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(config, workerSetupData)
);
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
if (targetWorkerCount < 0) {
// Initialize to size of current worker pool, subject to pool size limits
targetWorkerCount = Math.max(
Math.min(
zkWorkers.size(),
workerSetupData.getMaxNumWorkers()
),
workerSetupData.getMinNumWorkers()
);
log.info(
"Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
}
final boolean atSteadyState = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty()
&& validWorkers.size() == targetWorkerCount;
final boolean shouldScaleUp = atSteadyState
&& hasTaskPendingBeyondThreshold(pendingTasks)
&& targetWorkerCount < workerSetupData.getMaxNumWorkers();
final boolean shouldScaleDown = atSteadyState
&& Iterables.any(validWorkers, isLazyWorker)
&& targetWorkerCount > workerSetupData.getMinNumWorkers();
if (shouldScaleUp) {
targetWorkerCount++;
log.info(
"I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} else if (shouldScaleDown) {
targetWorkerCount--;
log.info(
"I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} else {
log.info(
"Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} }
} }
return false; }
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
{
synchronized (lock) {
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
return true;
}
}
return false;
}
} }
} }

View File

@ -44,7 +44,7 @@ public class TestMergeTask extends MergeTask
Lists.<DataSegment>newArrayList( Lists.<DataSegment>newArrayList(
new DataSegment( new DataSegment(
"dummyDs", "dummyDs",
new Interval(new DateTime(), new DateTime()), new Interval(new DateTime(), new DateTime().plus(1)),
new DateTime().toString(), new DateTime().toString(),
null, null,
null, null,

View File

@ -126,13 +126,11 @@ public class EC2AutoScalingStrategyTest
AutoScalingData created = strategy.provision(); AutoScalingData created = strategy.provision();
Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals("theInstance", created.getNodeIds().get(0)); Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP")); AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(deleted.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0)); Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0));
} }
} }

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.scaling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -28,6 +29,7 @@ import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.ZkWorker;
@ -63,7 +65,7 @@ public class SimpleResourceManagementStrategyTest
public void setUp() throws Exception public void setUp() throws Exception
{ {
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<WorkerSetupData>( workerSetupData = new AtomicReference<>(
new WorkerSetupData( new WorkerSetupData(
"0", 0, 2, null, null, null "0", 0, 2, null, null, null
) )
@ -105,7 +107,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList()) new AutoScalingData(Lists.<String>newArrayList("aNode"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -133,7 +135,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -190,7 +192,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject()))
.andReturn(null); .andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -242,7 +244,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList()) new AutoScalingData(Lists.<String>newArrayList())
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -272,7 +274,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2); .andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip"), Lists.newArrayList("ip")) new AutoScalingData(Lists.<String>newArrayList("ip"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -309,15 +311,174 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScalingStrategy);
} }
@Test
public void testNoActionNeeded() throws Exception
{
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testMinVersionIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minVersion
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
// Provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Terminate old workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
ImmutableList.of("h1", "h2", "h3", "h4")
);
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
new AutoScalingData(ImmutableList.of("h1", "h2"))
);
EasyMock.replay(autoScalingStrategy);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(null, "h1", "i1", "0"),
new TestZkWorker(null, "h2", "i2", "0"),
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
)
);
Assert.assertTrue(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNullWorkerSetupData() throws Exception
{
workerSetupData.set(null);
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
private static class TestZkWorker extends ZkWorker private static class TestZkWorker extends ZkWorker
{ {
private final Task testTask; private final Task testTask;
private TestZkWorker( public TestZkWorker(
Task testTask Task testTask
) )
{ {
super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper()); this(testTask, "host", "ip", "0");
}
public TestZkWorker(
Task testTask,
String host,
String ip,
String version
)
{
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
this.testTask = testTask; this.testTask = testTask;
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

18
pom.xml
View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>
@ -40,7 +40,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.1</metamx.java-util.version> <metamx.java-util.version>0.25.1</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version> <apache.curator.version>2.3.0</apache.curator.version>
<druid.api.version>0.1.7</druid.api.version> <druid.api.version>0.1.7</druid.api.version>
</properties> </properties>
@ -73,7 +73,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>emitter</artifactId> <artifactId>emitter</artifactId>
<version>0.2.6</version> <version>0.2.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
@ -289,6 +289,16 @@
<groupId>com.sun.jersey.contribs</groupId> <groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId> <artifactId>jersey-guice</artifactId>
<version>1.17.1</version> <version>1.17.1</version>
<exclusions>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.sun.jersey</groupId> <groupId>com.sun.jersey</groupId>
@ -409,7 +419,7 @@
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.8.1</version> <version>4.11</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -37,10 +37,6 @@
<artifactId>druid-common</artifactId> <artifactId>druid-common</artifactId>
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId> <artifactId>bytebuffer-collections</artifactId>
@ -49,8 +45,6 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>emitter</artifactId> <artifactId>emitter</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.ning</groupId> <groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId> <artifactId>compress-lzf</artifactId>
@ -63,34 +57,6 @@
<groupId>it.uniroma3.mat</groupId> <groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId> <artifactId>extendedset</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
@ -107,6 +73,11 @@
<groupId>com.ibm.icu</groupId> <groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId> <artifactId>icu4j</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.mozilla</groupId>
<artifactId>rhino</artifactId>
<version>1.7R4</version>
</dependency>
<!-- Tests --> <!-- Tests -->

View File

@ -9,7 +9,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
@ -37,24 +37,14 @@
<artifactId>druid-processing</artifactId> <artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version> <version>${project.parent.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>http-client</artifactId> <artifactId>http-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId> <artifactId>server-metrics</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-cli</groupId> <groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId> <artifactId>commons-cli</artifactId>
@ -64,21 +54,13 @@
<artifactId>commons-lang</artifactId> <artifactId>commons-lang</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>javax.inject</groupId>
<artifactId>commons-io</artifactId> <artifactId>javax.inject</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId> <artifactId>aws-java-sdk</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId> <artifactId>curator-framework</artifactId>
@ -87,42 +69,14 @@
<groupId>org.apache.curator</groupId> <groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId> <artifactId>curator-x-discovery</artifactId>
</dependency> </dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId> <groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId> <artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId> <artifactId>jackson-dataformat-smile</artifactId>
</dependency> </dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.sun.jersey</groupId> <groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId> <artifactId>jersey-server</artifactId>
@ -131,6 +85,10 @@
<groupId>com.sun.jersey</groupId> <groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId> <artifactId>jersey-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.sun.jersey.contribs</groupId> <groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId> <artifactId>jersey-guice</artifactId>
@ -139,22 +97,10 @@
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId> <artifactId>jetty-server</artifactId>
</dependency> </dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.google.code.findbugs</groupId> <groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
</dependency> </dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.tesla.aether</groupId> <groupId>io.tesla.aether</groupId>
<artifactId>tesla-aether</artifactId> <artifactId>tesla-aether</artifactId>

View File

@ -40,14 +40,7 @@ import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.initialization.CuratorDiscoveryConfig; import io.druid.server.initialization.CuratorDiscoveryConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ProviderStrategy; import org.apache.curator.x.discovery.*;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceCacheBuilder;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.ServiceProviderBuilder;
import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.apache.curator.x.discovery.details.ServiceCacheListener;
import java.io.IOException; import java.io.IOException;
@ -389,8 +382,12 @@ public class DiscoveryModule implements Module
} }
@Override @Override
public ServiceProviderBuilder<T> refreshPaddingMs(int refreshPaddingMs) public ServiceProviderBuilder<T> downInstancePolicy(DownInstancePolicy downInstancePolicy) {
{ return this;
}
@Override
public ServiceProviderBuilder<T> additionalFilter(InstanceFilter<T> tInstanceFilter) {
return this; return this;
} }
} }
@ -409,6 +406,11 @@ public class DiscoveryModule implements Module
return null; return null;
} }
@Override
public void noteError(ServiceInstance<T> tServiceInstance) {
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -62,6 +62,11 @@ public class ServerDiscoveryFactory
return null; return null;
} }
@Override
public void noteError(ServiceInstance<T> tServiceInstance) {
// do nothing
}
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {

View File

@ -209,7 +209,7 @@ public class DatabaseRuleManager
String.format( String.format(
"SELECT r.dataSource, r.payload " "SELECT r.dataSource, r.payload "
+ "FROM %1$s r " + "FROM %1$s r "
+ "INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds " + "INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds "
+ "ON r.datasource = ds.datasource and r.version = ds.version", + "ON r.datasource = ds.datasource and r.version = ds.version",
getRulesTable() getRulesTable()
) )

View File

@ -213,7 +213,7 @@ public class DatabaseSegmentManager
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
batch.add( batch.add(
String.format( String.format(
"UPDATE %s SET used=1 WHERE id = '%s'", "UPDATE %s SET used=true WHERE id = '%s'",
getSegmentsTable(), getSegmentsTable(),
segment.getIdentifier() segment.getIdentifier()
) )
@ -244,7 +244,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=1 WHERE id = :id", getSegmentsTable()) String.format("UPDATE %s SET used=true WHERE id = :id", getSegmentsTable())
) )
.bind("id", segmentId) .bind("id", segmentId)
.execute(); .execute();
@ -278,7 +278,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", getSegmentsTable()) String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
) )
.bind("dataSource", ds) .bind("dataSource", ds)
.execute(); .execute();
@ -308,7 +308,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE id = :segmentID", getSegmentsTable()) String.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())
).bind("segmentID", segmentID) ).bind("segmentID", segmentID)
.execute(); .execute();
@ -408,7 +408,7 @@ public class DatabaseSegmentManager
public List<Map<String, Object>> withHandle(Handle handle) throws Exception public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{ {
return handle.createQuery( return handle.createQuery(
String.format("SELECT payload FROM %s WHERE used=1", getSegmentsTable()) String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable())
).list(); ).list();
} }
} }
@ -465,4 +465,4 @@ public class DatabaseSegmentManager
private String getSegmentsTable() { private String getSegmentsTable() {
return dbTables.get().getSegmentsTable(); return dbTables.get().getSegmentsTable();
} }
} }

View File

@ -712,7 +712,7 @@ public class RealtimePlumberSchool implements PlumberSchool
handoffCondition.notifyAll(); handoffCondition.notifyAll();
} }
} }
catch (IOException e) { catch (Exception e) {
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
.addData("interval", sink.getInterval()) .addData("interval", sink.getInterval())
.emit(); .emit();

View File

@ -28,8 +28,8 @@ import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
*/ */
@ -40,7 +40,7 @@ public class StatusResource
@Produces("application/json") @Produces("application/json")
public Status doGet() public Status doGet()
{ {
return new Status(); return new Status(Initialization.getLoadedModules(DruidModule.class));
} }
public static class Status public static class Status
@ -49,13 +49,18 @@ public class StatusResource
final List<ModuleVersion> modules; final List<ModuleVersion> modules;
final Memory memory; final Memory memory;
public Status() public Status(Collection<DruidModule> modules)
{ {
this.version = Status.class.getPackage().getImplementationVersion(); this.version = getDruidVersion();
this.modules = getExtensionVersions(); this.modules = getExtensionVersions(modules);
this.memory = new Memory(Runtime.getRuntime()); this.memory = new Memory(Runtime.getRuntime());
} }
private String getDruidVersion()
{
return Status.class.getPackage().getImplementationVersion();
}
@JsonProperty @JsonProperty
public String getVersion() public String getVersion()
{ {
@ -98,9 +103,8 @@ public class StatusResource
* *
* @return map of extensions loaded with their respective implementation versions. * @return map of extensions loaded with their respective implementation versions.
*/ */
private List<ModuleVersion> getExtensionVersions() private List<ModuleVersion> getExtensionVersions(Collection<DruidModule> druidModules)
{ {
final Set<DruidModule> druidModules = Initialization.getLoadedModules(DruidModule.class);
List<ModuleVersion> moduleVersions = new ArrayList<>(); List<ModuleVersion> moduleVersions = new ArrayList<>();
for (DruidModule module : druidModules) { for (DruidModule module : druidModules) {
String artifact = module.getClass().getPackage().getImplementationTitle(); String artifact = module.getClass().getPackage().getImplementationTitle();
@ -110,7 +114,6 @@ public class StatusResource
} }
return moduleVersions; return moduleVersions;
} }
} }
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)

View File

@ -0,0 +1,152 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.initialization;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import io.druid.server.initialization.ExtensionsConfig;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class InitializationTest
{
private String oldService;
private String oldHost;
private String oldPort;
@Before
public void messWithSystemProperties()
{
// required to test Initialization.makeInjectorWithModules
oldService = System.setProperty("druid.service", "test-service");
oldHost = System.setProperty("druid.host", "test-host");
oldPort = System.setProperty("druid.port", "8080");
}
@After
public void cleanup()
{
System.setProperty("druid.service", oldService == null ? "" : oldService);
System.setProperty("druid.host", oldHost == null ? "" : oldHost);
System.setProperty("druid.port", oldPort == null ? "" : oldPort);
}
@Test
public void test01InitialModulesEmpty() throws Exception
{
Assert.assertEquals(
"Initial set of loaded modules must be empty",
0,
Initialization.getLoadedModules(DruidModule.class).size()
);
}
@Test
public void test02MakeStartupInjector() throws Exception
{
Injector startupInjector = Initialization.makeStartupInjector();
Assert.assertNotNull(startupInjector);
Assert.assertNotNull(startupInjector.getInstance(ObjectMapper.class));
}
@Test
public void test03ClassLoaderExtensionsLoading()
{
Injector startupInjector = Initialization.makeStartupInjector();
Function<DruidModule, String> fnClassName = new Function<DruidModule, String>()
{
@Nullable
@Override
public String apply(@Nullable DruidModule input)
{
return input.getClass().getCanonicalName();
}
};
Assert.assertFalse(
"modules does not contain TestDruidModule",
Collections2.transform(Initialization.getLoadedModules(DruidModule.class), fnClassName)
.contains("io.druid.initialization.InitializationTest.TestDruidModule")
);
Collection<DruidModule> modules = Initialization.getFromExtensions(
startupInjector.getInstance(ExtensionsConfig.class),
DruidModule.class
);
Assert.assertTrue(
"modules contains TestDruidModule",
Collections2.transform(modules, fnClassName)
.contains("io.druid.initialization.InitializationTest.TestDruidModule")
);
}
@Test
public void test04MakeInjectorWithModules() throws Exception
{
Injector startupInjector = Initialization.makeStartupInjector();
Injector injector = Initialization.makeInjectorWithModules(startupInjector, ImmutableList.of());
Assert.assertNotNull(injector);
}
@Test
public void testGetLoadedModules()
{
Set<DruidModule> modules = Initialization.getLoadedModules(DruidModule.class);
Set<DruidModule> loadedModules = Initialization.getLoadedModules(DruidModule.class);
Assert.assertEquals("Set from loaded modules #1 should be same!", modules, loadedModules);
Set<DruidModule> loadedModules2 = Initialization.getLoadedModules(DruidModule.class);
Assert.assertEquals("Set from loaded modules #2 should be same!", modules, loadedModules2);
}
public static class TestDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
// Do nothing
}
}
}

View File

@ -19,19 +19,14 @@
package io.druid.server; package io.druid.server;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization; import io.druid.initialization.InitializationTest;
import io.druid.server.initialization.ExtensionsConfig;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set;
import static io.druid.server.StatusResource.ModuleVersion; import static io.druid.server.StatusResource.ModuleVersion;
@ -39,29 +34,14 @@ import static io.druid.server.StatusResource.ModuleVersion;
*/ */
public class StatusResourceTest public class StatusResourceTest
{ {
private Collection<DruidModule> loadTestModule()
{
Injector baseInjector = Initialization.makeStartupInjector();
return Initialization.getFromExtensions(baseInjector.getInstance(ExtensionsConfig.class), DruidModule.class);
}
@Test @Test
public void testLoadedModules() public void testLoadedModules()
{ {
final StatusResource resource = new StatusResource();
List<ModuleVersion> statusResourceModuleList;
statusResourceModuleList = resource.doGet().getModules(); Collection<DruidModule> modules = ImmutableList.of((DruidModule)new InitializationTest.TestDruidModule());
Assert.assertEquals( List<ModuleVersion> statusResourceModuleList = new StatusResource.Status(modules).getModules();
"No Modules should be loaded currently! " + statusResourceModuleList,
statusResourceModuleList.size(), 0
);
Collection<DruidModule> modules = loadTestModule(); Assert.assertEquals("Status should have all modules loaded!", modules.size(), statusResourceModuleList.size());
statusResourceModuleList = resource.doGet().getModules();
Assert.assertEquals("Status should have all modules loaded!", statusResourceModuleList.size(), modules.size());
for (DruidModule module : modules) { for (DruidModule module : modules) {
String moduleName = module.getClass().getCanonicalName(); String moduleName = module.getClass().getCanonicalName();
@ -74,30 +54,6 @@ public class StatusResourceTest
} }
Assert.assertTrue("Status resource should contain module " + moduleName, contains); Assert.assertTrue("Status resource should contain module " + moduleName, contains);
} }
/*
* StatusResource only uses Initialization.getLoadedModules
*/
for (int i = 0; i < 5; i++) {
Set<DruidModule> loadedModules = Initialization.getLoadedModules(DruidModule.class);
Assert.assertEquals("Set from loaded module should be same!", loadedModules, modules);
}
} }
public static class TestDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
// Do nothing
}
}
} }

View File

@ -1 +1 @@
io.druid.server.StatusResourceTest$TestDruidModule io.druid.initialization.InitializationTest$TestDruidModule

View File

@ -27,7 +27,7 @@
<parent> <parent>
<groupId>io.druid</groupId> <groupId>io.druid</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.6.43-SNAPSHOT</version> <version>0.6.47-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -53,7 +53,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "broker", name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.40/Broker.html for a description" description = "Runs a broker node, see http://druid.io/docs/0.6.46/Broker.html for a description"
) )
public class CliBroker extends ServerRunnable public class CliBroker extends ServerRunnable
{ {

View File

@ -60,7 +60,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "coordinator", name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.40/Coordinator.html for a description." description = "Runs the Coordinator, see http://druid.io/docs/0.6.46/Coordinator.html for a description."
) )
public class CliCoordinator extends ServerRunnable public class CliCoordinator extends ServerRunnable
{ {

View File

@ -41,7 +41,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "hadoop", name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.40/Batch-ingestion.html for a description." description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.46/Batch-ingestion.html for a description."
) )
public class CliHadoopIndexer implements Runnable public class CliHadoopIndexer implements Runnable
{ {

View File

@ -42,7 +42,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "historical", name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.40/Historical.html for a description" description = "Runs a Historical node, see http://druid.io/docs/0.6.46/Historical.html for a description"
) )
public class CliHistorical extends ServerRunnable public class CliHistorical extends ServerRunnable
{ {

View File

@ -93,7 +93,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "overlord", name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.40/Indexing-Service.html for a description" description = "Runs an Overlord node, see http://druid.io/docs/0.6.46/Indexing-Service.html for a description"
) )
public class CliOverlord extends ServerRunnable public class CliOverlord extends ServerRunnable
{ {

View File

@ -30,7 +30,7 @@ import java.util.List;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.40/Realtime.html for a description" description = "Runs a realtime node, see http://druid.io/docs/0.6.46/Realtime.html for a description"
) )
public class CliRealtime extends ServerRunnable public class CliRealtime extends ServerRunnable
{ {

View File

@ -42,7 +42,7 @@ import java.util.concurrent.Executor;
*/ */
@Command( @Command(
name = "realtime", name = "realtime",
description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.40/Realtime.html for a description" description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.46/Realtime.html for a description"
) )
public class CliRealtimeExample extends ServerRunnable public class CliRealtimeExample extends ServerRunnable
{ {

View File

@ -20,6 +20,8 @@
package io.druid.cli; package io.druid.cli;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization;
import io.druid.server.StatusResource; import io.druid.server.StatusResource;
@Command( @Command(
@ -31,6 +33,6 @@ public class Version implements Runnable
@Override @Override
public void run() public void run()
{ {
System.out.println(new StatusResource.Status()); System.out.println(new StatusResource.Status(Initialization.getLoadedModules(DruidModule.class)));
} }
} }