diff --git a/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java b/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java
index e0a73b4bc3c..7786d2c01bd 100644
--- a/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java
+++ b/client/src/main/java/com/metamx/druid/query/search/FragmentSearchQuerySpec.java
@@ -73,7 +73,7 @@ public class FragmentSearchQuerySpec implements SearchQuerySpec
public boolean accept(String dimVal)
{
for (String value : values) {
- if (!dimVal.toLowerCase().contains(value)) {
+ if (dimVal == null || !dimVal.toLowerCase().contains(value)) {
return false;
}
}
diff --git a/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java b/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java
index 87a6246a5b2..1de1c7360fa 100644
--- a/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java
+++ b/client/src/main/java/com/metamx/druid/query/search/InsensitiveContainsSearchQuerySpec.java
@@ -59,6 +59,9 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
@Override
public boolean accept(String dimVal)
{
+ if (dimVal == null) {
+ return false;
+ }
return dimVal.toLowerCase().contains(value);
}
@@ -77,8 +80,8 @@ public class InsensitiveContainsSearchQuerySpec implements SearchQuerySpec
public String toString()
{
return "InsensitiveContainsSearchQuerySpec{" +
- "value=" + value +
- ", sortSpec=" + sortSpec +
+ "value=" + value +
+ ", sortSpec=" + sortSpec +
"}";
}
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
index 32acc66ae43..c2a201c9632 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java
@@ -19,27 +19,21 @@
package com.metamx.druid.merger.coordinator;
-import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
-import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
-import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskHolder;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
-import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
-import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
@@ -52,29 +46,25 @@ import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
-import org.joda.time.Duration;
-import org.joda.time.Period;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * The RemoteTaskRunner encapsulates all interactions with Zookeeper and keeps track of which workers
- * are running which tasks. The RemoteTaskRunner is event driven and updates state according to ephemeral node
- * changes in ZK.
+ * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure
+ * scenarios. The RemoteTaskRunner keeps track of which workers are running which tasks and manages coordinator and
+ * worker interactions over Zookeeper. The RemoteTaskRunner is event driven and updates state according to ephemeral
+ * node changes in ZK.
*
- * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. RemoteTaskRunners have scaling
- * strategies to help them decide when to create or delete new resources. When tasks are assigned to the remote
- * task runner and no workers have capacity to handle the task, provisioning will be done according to the strategy.
- * The remote task runner periodically runs a check to see if any worker nodes have not had any work for a
- * specified period of time. If so, the worker node will be terminated.
+ * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
+ * fail. The RemoteTaskRunner depends on another manager to create additional worker resources.
+ * For example, {@link com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler} can take care of these duties.
+ *
*
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks
* that were associated with the node.
@@ -90,7 +80,6 @@ public class RemoteTaskRunner implements TaskRunner
private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
- private final ScalingStrategy strategy;
private final WorkerSetupManager workerSetupManager;
// all workers that exist in ZK
@@ -98,12 +87,8 @@ public class RemoteTaskRunner implements TaskRunner
// all tasks that are assigned or need to be assigned
private final Map tasks = new ConcurrentHashMap();
- private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet();
- private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet();
private final Object statusLock = new Object();
- private volatile DateTime lastProvisionTime = new DateTime();
- private volatile DateTime lastTerminateTime = new DateTime();
private volatile boolean started = false;
public RemoteTaskRunner(
@@ -113,7 +98,6 @@ public class RemoteTaskRunner implements TaskRunner
PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
- ScalingStrategy strategy,
WorkerSetupManager workerSetupManager
)
{
@@ -123,7 +107,6 @@ public class RemoteTaskRunner implements TaskRunner
this.workerPathCache = workerPathCache;
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
- this.strategy = strategy;
this.workerSetupManager = workerSetupManager;
}
@@ -131,6 +114,10 @@ public class RemoteTaskRunner implements TaskRunner
public void start()
{
try {
+ if (started) {
+ return;
+ }
+
workerPathCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@@ -157,81 +144,6 @@ public class RemoteTaskRunner implements TaskRunner
);
workerPathCache.start();
- // Schedule termination of worker nodes periodically
- Period period = new Period(config.getTerminateResourcesDuration());
- PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
- final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
-
- ScheduledExecutors.scheduleAtFixedRate(
- scheduledExec,
- new Duration(
- System.currentTimeMillis(),
- startTime
- ),
- config.getTerminateResourcesDuration(),
- new Runnable()
- {
- @Override
- public void run()
- {
- if (currentlyTerminating.isEmpty()) {
- final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
- if (zkWorkers.size() <= minNumWorkers) {
- return;
- }
-
- List thoseLazyWorkers = Lists.newArrayList(
- FunctionalIterable
- .create(zkWorkers.values())
- .filter(
- new Predicate()
- {
- @Override
- public boolean apply(WorkerWrapper input)
- {
- return input.getRunningTasks().isEmpty()
- && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
- > config.getMaxWorkerIdleTimeMillisBeforeDeletion();
- }
- }
- )
- );
-
- AutoScalingData terminated = strategy.terminate(
- Lists.transform(
- thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
- new Function()
- {
- @Override
- public String apply(WorkerWrapper input)
- {
- return input.getWorker().getIp();
- }
- }
- )
- );
-
- if (terminated != null) {
- currentlyTerminating.addAll(terminated.getNodeIds());
- lastTerminateTime = new DateTime();
- }
- } else {
- Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
- if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
- log.makeAlert("Worker node termination taking too long")
- .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
- .addData("terminatingCount", currentlyTerminating.size())
- .emit();
- }
-
- log.info(
- "%s still terminating. Wait for all nodes to terminate before trying again.",
- currentlyTerminating
- );
- }
- }
- }
- );
started = true;
}
catch (Exception e) {
@@ -243,6 +155,10 @@ public class RemoteTaskRunner implements TaskRunner
public void stop()
{
try {
+ if (!started) {
+ return;
+ }
+
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
workerWrapper.close();
}
@@ -255,16 +171,16 @@ public class RemoteTaskRunner implements TaskRunner
}
}
- public boolean hasStarted()
- {
- return started;
- }
-
- public int getNumWorkers()
+ public int getNumAvailableWorkers()
{
return zkWorkers.size();
}
+ public Collection getAvailableWorkers()
+ {
+ return zkWorkers.values();
+ }
+
public boolean isTaskRunning(String taskId)
{
for (WorkerWrapper workerWrapper : zkWorkers.values()) {
@@ -275,6 +191,13 @@ public class RemoteTaskRunner implements TaskRunner
return false;
}
+ /**
+ * A task will be run only if there is no current knowledge in the RemoteTaskRunner of the task.
+ *
+ * @param task task to run
+ * @param context task context to run under
+ * @param callback callback to be called exactly once
+ */
@Override
public void run(Task task, TaskContext context, TaskCallback callback)
{
@@ -288,11 +211,18 @@ public class RemoteTaskRunner implements TaskRunner
assignTask(taskWrapper);
}
+ /**
+ * Ensures no workers are already running a task before assigning the task to a worker.
+ * It is possible that a worker is running a task the RTR has no knowledge of. This is common when the RTR
+ * needs to bootstrap after a restart.
+ *
+ * @param taskWrapper - a wrapper containing task metadata
+ */
private void assignTask(TaskWrapper taskWrapper)
{
WorkerWrapper workerWrapper = findWorkerRunningTask(taskWrapper);
- // If the task already exists, we don't need to announce it
+ // If a worker is already running this task, we don't need to announce it
if (workerWrapper != null) {
final Worker worker = workerWrapper.getWorker();
try {
@@ -395,8 +325,6 @@ public class RemoteTaskRunner implements TaskRunner
private void addWorker(final Worker worker)
{
try {
- currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.asList(worker.getIp())));
-
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final WorkerWrapper workerWrapper = new WorkerWrapper(
@@ -460,12 +388,12 @@ public class RemoteTaskRunner implements TaskRunner
} else {
final TaskCallback callback = taskWrapper.getCallback();
- // Cleanup
- if (callback != null) {
- callback.notify(taskStatus);
- }
-
if (taskStatus.isComplete()) {
+ // Cleanup
+ if (callback != null) {
+ callback.notify(taskStatus);
+ }
+
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
tasks.remove(taskId);
@@ -510,8 +438,6 @@ public class RemoteTaskRunner implements TaskRunner
*/
private void removeWorker(final Worker worker)
{
- currentlyTerminating.remove(worker.getHost());
-
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
try {
@@ -564,27 +490,6 @@ public class RemoteTaskRunner implements TaskRunner
if (workerQueue.isEmpty()) {
log.info("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
-
- if (currentlyProvisioning.isEmpty()) {
- AutoScalingData provisioned = strategy.provision();
- if (provisioned != null) {
- currentlyProvisioning.addAll(provisioned.getNodeIds());
- lastProvisionTime = new DateTime();
- }
- } else {
- Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
- if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
- log.makeAlert("Worker node provisioning taking too long")
- .addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
- .addData("provisioningCount", currentlyProvisioning.size())
- .emit();
- }
-
- log.info(
- "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
- currentlyProvisioning
- );
- }
return null;
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java
index e228b401025..90f19000e60 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java
@@ -42,6 +42,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -376,6 +377,12 @@ public class TaskQueue
}
}
+ public Collection getAvailableTasks()
+ {
+ // TODO: actually implement this
+ return Lists.newArrayList();
+ }
+
/**
* Unlock some work. Does not update the task storage facility. Throws an exception if this work is not currently
* running.
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
index 3d084d60712..242c7873b6d 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
@@ -75,8 +75,12 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
-import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
-import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
+import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
+import com.metamx.druid.merger.coordinator.scaling.ResourceManagementSchedulerConfig;
+import com.metamx.druid.merger.coordinator.scaling.ResourceManagmentScheduler;
+import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
+import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagementStrategy;
+import com.metamx.druid.merger.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@@ -529,35 +533,19 @@ public class IndexerCoordinatorNode extends RegisteringNode
.build()
);
- ScalingStrategy strategy;
- if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
- strategy = new EC2AutoScalingStrategy(
- jsonMapper,
- new AmazonEC2Client(
- new BasicAWSCredentials(
- PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
- PropUtils.getProperty(props, "com.metamx.aws.secretKey")
- )
- ),
- configFactory.build(EC2AutoScalingStrategyConfig.class),
- workerSetupManager
- );
- } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
- strategy = new NoopScalingStrategy();
- } else {
- throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
- }
-
- return new RemoteTaskRunner(
+ RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
configFactory.build(RemoteTaskRunnerConfig.class),
curatorFramework,
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
- strategy,
workerSetupManager
);
+
+ initializeWorkerScaling(remoteTaskRunner);
+
+ return remoteTaskRunner;
}
};
@@ -577,6 +565,49 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
+ private void initializeWorkerScaling(RemoteTaskRunner taskRunner)
+ {
+ final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("ScalingExec--%d")
+ .build()
+ );
+
+ AutoScalingStrategy strategy;
+ if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
+ strategy = new EC2AutoScalingStrategy(
+ jsonMapper,
+ new AmazonEC2Client(
+ new BasicAWSCredentials(
+ PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
+ PropUtils.getProperty(props, "com.metamx.aws.secretKey")
+ )
+ ),
+ configFactory.build(EC2AutoScalingStrategyConfig.class),
+ workerSetupManager
+ );
+ } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
+ strategy = new NoopAutoScalingStrategy();
+ } else {
+ throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
+ }
+
+ ResourceManagmentScheduler resourceManagmentScheduler = new ResourceManagmentScheduler(
+ taskQueue,
+ taskRunner,
+ new SimpleResourceManagementStrategy(
+ strategy,
+ configFactory.build(SimpleResourceManagmentConfig.class),
+ workerSetupManager
+ ),
+ configFactory.build(ResourceManagementSchedulerConfig.class),
+ scalingScheduledExec
+ );
+ lifecycle.addManagedInstance(resourceManagmentScheduler);
+ }
+
public static class Builder
{
private ObjectMapper jsonMapper = null;
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java
index 5a1bb4980e5..0ca74a9b38c 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java
@@ -43,4 +43,13 @@ public class AutoScalingData
{
return nodes;
}
+
+ @Override
+ public String toString()
+ {
+ return "AutoScalingData{" +
+ "nodeIds=" + nodeIds +
+ ", nodes=" + nodes +
+ '}';
+ }
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingStrategy.java
similarity index 81%
rename from merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java
rename to merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingStrategy.java
index 150de1357e0..7ab92a0b985 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingStrategy.java
@@ -22,8 +22,9 @@ package com.metamx.druid.merger.coordinator.scaling;
import java.util.List;
/**
+ * The AutoScalingStrategy has the actual methods to provision and terminate worker nodes.
*/
-public interface ScalingStrategy
+public interface AutoScalingStrategy
{
public AutoScalingData provision();
@@ -31,8 +32,8 @@ public interface ScalingStrategy
/**
* Provides a lookup of ip addresses to node ids
- * @param ips
- * @return
+ * @param ips - nodes ips
+ * @return node ids
*/
- public List ipLookup(List ips);
+ public List ipToIdLookup(List ips);
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java
index 2a50a8b55fd..d64899a2739 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java
@@ -43,7 +43,7 @@ import java.util.List;
/**
*/
-public class EC2AutoScalingStrategy implements ScalingStrategy
+public class EC2AutoScalingStrategy implements AutoScalingStrategy
{
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
@@ -187,7 +187,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy
}
@Override
- public List ipLookup(List ips)
+ public List ipToIdLookup(List ips)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopAutoScalingStrategy.java
similarity index 90%
rename from merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java
rename to merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopAutoScalingStrategy.java
index 2b412ca6202..d4a5f355c6f 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopAutoScalingStrategy.java
@@ -26,9 +26,9 @@ import java.util.List;
/**
* This class just logs when scaling should occur.
*/
-public class NoopScalingStrategy implements ScalingStrategy
+public class NoopAutoScalingStrategy implements AutoScalingStrategy
{
- private static final EmittingLogger log = new EmittingLogger(NoopScalingStrategy.class);
+ private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class);
@Override
public AutoScalingData provision()
@@ -45,7 +45,7 @@ public class NoopScalingStrategy implements ScalingStrategy
}
@Override
- public List ipLookup(List ips)
+ public List ipToIdLookup(List ips)
{
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
return ips;
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java
new file mode 100644
index 00000000000..c732b5c6c1a
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementSchedulerConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class ResourceManagementSchedulerConfig
+{
+ @Config("druid.indexer.provisionResources.duration")
+ @Default("PT1H")
+ public abstract Duration getProvisionResourcesDuration();
+
+ @Config("druid.indexer.terminateResources.duration")
+ @Default("PT1H")
+ public abstract Duration getTerminateResourcesDuration();
+
+ @Config("druid.indexer.terminateResources.originDateTime")
+ @Default("2012-01-01T00:55:00.000Z")
+ public abstract DateTime getTerminateResourcesOriginDateTime();
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java
new file mode 100644
index 00000000000..37483082e98
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagementStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.metamx.druid.merger.common.task.Task;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+
+import java.util.Collection;
+
+/**
+ * The ResourceManagementStrategy decides if worker nodes should be provisioned or determined
+ * based on the available tasks in the system and the state of the workers in the system.
+ */
+public interface ResourceManagementStrategy
+{
+ public void doProvision(Collection availableTasks, Collection workerWrappers);
+
+ public void doTerminate(Collection availableTasks, Collection workerWrappers);
+
+ public ScalingStats getStats();
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java
new file mode 100644
index 00000000000..b2ae623b978
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ResourceManagmentScheduler.java
@@ -0,0 +1,138 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.metamx.common.concurrent.ScheduledExecutors;
+import com.metamx.common.lifecycle.LifecycleStart;
+import com.metamx.common.lifecycle.LifecycleStop;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.PeriodGranularity;
+import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
+import com.metamx.druid.merger.coordinator.TaskQueue;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * The ResourceManagmentScheduler manages when worker nodes should potentially be created or destroyed.
+ * It uses a {@link TaskQueue} to return the available tasks in the system and a {@link RemoteTaskRunner} to return
+ * the status of the worker nodes in the system.
+ * The ResourceManagmentScheduler does not contain the logic to decide whether provision or termination should actually occur.
+ * That decision is made in the {@link ResourceManagementStrategy}.
+ */
+public class ResourceManagmentScheduler
+{
+ private static final Logger log = new Logger(ResourceManagmentScheduler.class);
+
+ private final TaskQueue taskQueue;
+ private final RemoteTaskRunner remoteTaskRunner;
+ private final ResourceManagementStrategy resourceManagementStrategy;
+ private final ResourceManagementSchedulerConfig config;
+ private final ScheduledExecutorService exec;
+
+ private final Object lock = new Object();
+ private volatile boolean started = false;
+
+ public ResourceManagmentScheduler(
+ TaskQueue taskQueue,
+ RemoteTaskRunner remoteTaskRunner,
+ ResourceManagementStrategy resourceManagementStrategy,
+ ResourceManagementSchedulerConfig config,
+ ScheduledExecutorService exec
+ )
+ {
+ this.taskQueue = taskQueue;
+ this.remoteTaskRunner = remoteTaskRunner;
+ this.resourceManagementStrategy = resourceManagementStrategy;
+ this.config = config;
+ this.exec = exec;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ synchronized (lock) {
+ if (started) {
+ return;
+ }
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ config.getProvisionResourcesDuration(),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ resourceManagementStrategy.doProvision(
+ taskQueue.getAvailableTasks(),
+ remoteTaskRunner.getAvailableWorkers()
+ );
+ }
+ }
+ );
+
+ // Schedule termination of worker nodes periodically
+ Period period = new Period(config.getTerminateResourcesDuration());
+ PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
+ final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ new Duration(
+ System.currentTimeMillis(),
+ startTime
+ ),
+ config.getTerminateResourcesDuration(),
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ resourceManagementStrategy.doTerminate(
+ taskQueue.getAvailableTasks(),
+ remoteTaskRunner.getAvailableWorkers()
+ );
+ }
+ }
+ );
+
+ started = true;
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ synchronized (lock) {
+ if (!started) {
+ return;
+ }
+ exec.shutdown();
+ }
+ }
+
+ public ScalingStats getStats()
+ {
+ return resourceManagementStrategy.getStats();
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java
new file mode 100644
index 00000000000..d632a61baae
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java
@@ -0,0 +1,88 @@
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.MinMaxPriorityQueue;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeComparator;
+
+import java.util.List;
+
+/**
+ */
+public class ScalingStats
+{
+ private static enum EVENT
+ {
+ PROVISION,
+ TERMINATE
+ }
+
+ private final MinMaxPriorityQueue recentNodes;
+
+ public ScalingStats(int capacity)
+ {
+ this.recentNodes = MinMaxPriorityQueue
+ .orderedBy(DateTimeComparator.getInstance())
+ .maximumSize(capacity)
+ .create();
+ }
+
+ public void addProvisionEvent(AutoScalingData data)
+ {
+ recentNodes.add(
+ new ScalingEvent(
+ data,
+ new DateTime(),
+ EVENT.PROVISION
+ )
+ );
+ }
+
+ public void addTerminateEvent(AutoScalingData data)
+ {
+ recentNodes.add(
+ new ScalingEvent(
+ data,
+ new DateTime(),
+ EVENT.TERMINATE
+ )
+ );
+ }
+
+ public List toList()
+ {
+ List retVal = Lists.newArrayList();
+ while (!recentNodes.isEmpty()) {
+ retVal.add(recentNodes.poll());
+ }
+ return retVal;
+ }
+
+ public static class ScalingEvent
+ {
+ private final AutoScalingData data;
+ private final DateTime timestamp;
+ private final EVENT event;
+
+ private ScalingEvent(
+ AutoScalingData data,
+ DateTime timestamp,
+ EVENT event
+ )
+ {
+ this.data = data;
+ this.timestamp = timestamp;
+ this.event = event;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ScalingEvent{" +
+ "data=" + data +
+ ", timestamp=" + timestamp +
+ ", event=" + event +
+ '}';
+ }
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java
new file mode 100644
index 00000000000..99c581e5cce
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java
@@ -0,0 +1,222 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.druid.merger.common.task.Task;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
+import com.metamx.emitter.EmittingLogger;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+/**
+ */
+public class SimpleResourceManagementStrategy implements ResourceManagementStrategy
+{
+ private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
+
+ private final AutoScalingStrategy autoScalingStrategy;
+ private final SimpleResourceManagmentConfig config;
+ private final WorkerSetupManager workerSetupManager;
+ private final ScalingStats scalingStats;
+
+ private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet();
+ private final ConcurrentSkipListSet currentlyTerminating = new ConcurrentSkipListSet();
+
+ private volatile DateTime lastProvisionTime = new DateTime();
+ private volatile DateTime lastTerminateTime = new DateTime();
+
+ public SimpleResourceManagementStrategy(
+ AutoScalingStrategy autoScalingStrategy,
+ SimpleResourceManagmentConfig config,
+ WorkerSetupManager workerSetupManager
+ )
+ {
+ this.autoScalingStrategy = autoScalingStrategy;
+ this.config = config;
+ this.workerSetupManager = workerSetupManager;
+ this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
+ }
+
+ @Override
+ public void doProvision(Collection availableTasks, Collection workerWrappers)
+ {
+ boolean nothingProvisioning = Sets.difference(
+ currentlyProvisioning,
+ Sets.newHashSet(
+ autoScalingStrategy.ipToIdLookup(
+ Lists.newArrayList(
+ Iterables.transform(
+ workerWrappers, new Function()
+ {
+ @Override
+ public String apply(WorkerWrapper input)
+ {
+ return input.getWorker().getIp();
+ }
+ }
+ )
+ )
+ )
+ )
+ ).isEmpty();
+
+ boolean moreTasksThanWorkerCapacity = !Sets.difference(
+ Sets.newHashSet(availableTasks),
+ Sets.newHashSet(
+ Iterables.concat(
+ Iterables.transform(
+ workerWrappers,
+ new Function>()
+ {
+ @Override
+ public Set apply(WorkerWrapper input)
+ {
+ return input.getRunningTasks();
+ }
+ }
+ )
+ )
+ )
+ ).isEmpty();
+
+ if (nothingProvisioning && moreTasksThanWorkerCapacity) {
+ AutoScalingData provisioned = autoScalingStrategy.provision();
+
+ if (provisioned != null) {
+ currentlyProvisioning.addAll(provisioned.getNodeIds());
+ lastProvisionTime = new DateTime();
+ scalingStats.addProvisionEvent(provisioned);
+ }
+ } else {
+ Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime);
+ if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
+ log.makeAlert("Worker node provisioning taking too long")
+ .addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
+ .addData("provisioningCount", currentlyProvisioning.size())
+ .emit();
+ }
+
+ log.info(
+ "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
+ currentlyProvisioning
+ );
+ }
+ }
+
+ @Override
+ public void doTerminate(Collection availableTasks, Collection workerWrappers)
+ {
+ boolean nothingTerminating = Sets.difference(
+ currentlyTerminating,
+ Sets.newHashSet(
+ autoScalingStrategy.ipToIdLookup(
+ Lists.newArrayList(
+ Iterables.transform(
+ workerWrappers, new Function()
+ {
+ @Override
+ public String apply(WorkerWrapper input)
+ {
+ return input.getWorker().getIp();
+ }
+ }
+ )
+ )
+ )
+ )
+ ).isEmpty();
+
+ if (nothingTerminating) {
+ final int minNumWorkers = workerSetupManager.getWorkerSetupData().getMinNumWorkers();
+ if (workerWrappers.size() <= minNumWorkers) {
+ return;
+ }
+
+ List thoseLazyWorkers = Lists.newArrayList(
+ FunctionalIterable
+ .create(workerWrappers)
+ .filter(
+ new Predicate()
+ {
+ @Override
+ public boolean apply(WorkerWrapper input)
+ {
+ return input.getRunningTasks().isEmpty()
+ && System.currentTimeMillis() - input.getLastCompletedTaskTime()
+ .getMillis()
+ > config.getMaxWorkerIdleTimeMillisBeforeDeletion();
+ }
+ }
+ )
+ );
+
+ AutoScalingData terminated = autoScalingStrategy.terminate(
+ Lists.transform(
+ thoseLazyWorkers.subList(minNumWorkers, thoseLazyWorkers.size() - 1),
+ new Function()
+ {
+ @Override
+ public String apply(WorkerWrapper input)
+ {
+ return input.getWorker().getIp();
+ }
+ }
+ )
+ );
+
+ if (terminated != null) {
+ currentlyTerminating.addAll(terminated.getNodeIds());
+ lastTerminateTime = new DateTime();
+ scalingStats.addProvisionEvent(terminated);
+ }
+ } else {
+ Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime);
+ if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
+ log.makeAlert("Worker node termination taking too long")
+ .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
+ .addData("terminatingCount", currentlyTerminating.size())
+ .emit();
+ }
+
+ log.info(
+ "%s still terminating. Wait for all nodes to terminate before trying again.",
+ currentlyTerminating
+ );
+ }
+ }
+
+ @Override
+ public ScalingStats getStats()
+ {
+ return scalingStats;
+ }
+}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java
new file mode 100644
index 00000000000..a8a5b52ca89
--- /dev/null
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagmentConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import org.joda.time.Duration;
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class SimpleResourceManagmentConfig
+{
+ @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
+ @Default("600000")
+ public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
+
+ @Config("druid.indexer.maxScalingDuration")
+ @Default("PT1H")
+ public abstract Duration getMaxScalingDuration();
+
+ @Config("druid.indexer.numEventsToTrack")
+ @Default("20")
+ public abstract int getNumEventsToTrack();
+}
diff --git a/merger/src/test/java/com/metamx/druid/merger/TestTask.java b/merger/src/test/java/com/metamx/druid/merger/TestTask.java
new file mode 100644
index 00000000000..142d2c88254
--- /dev/null
+++ b/merger/src/test/java/com/metamx/druid/merger/TestTask.java
@@ -0,0 +1,71 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger;
+
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.merger.common.TaskCallback;
+import com.metamx.druid.merger.common.TaskStatus;
+import com.metamx.druid.merger.common.TaskToolbox;
+import com.metamx.druid.merger.common.task.DefaultMergeTask;
+import com.metamx.druid.merger.coordinator.TaskContext;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonTypeName;
+
+import java.util.List;
+
+/**
+ */
+@JsonTypeName("test")
+public class TestTask extends DefaultMergeTask
+{
+ private final String id;
+
+ public TestTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("segments") List segments,
+ @JsonProperty("aggregations") List aggregators
+ )
+ {
+ super(dataSource, segments, aggregators);
+
+ this.id = id;
+ }
+
+ @Override
+ @JsonProperty
+ public String getId()
+ {
+ return id;
+ }
+
+ @Override
+ public Type getType()
+ {
+ return Type.TEST;
+ }
+
+ @Override
+ public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
+ {
+ return TaskStatus.success("task1");
+ }
+}
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java
index ec53b0257b6..c1f264b87ef 100644
--- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java
@@ -7,6 +7,7 @@ import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.jackson.DefaultObjectMapper;
+import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
@@ -14,11 +15,10 @@ import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.DefaultMergeTask;
import com.metamx.druid.merger.common.task.Task;
-import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
-import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
+import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.TaskMonitor;
@@ -326,7 +326,6 @@ public class RemoteTaskRunnerTest
pathChildrenCache,
scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()),
- new TestScalingStrategy(),
workerSetupManager
);
@@ -337,7 +336,7 @@ public class RemoteTaskRunnerTest
jsonMapper.writeValueAsBytes(worker1)
);
int count = 0;
- while (remoteTaskRunner.getNumWorkers() == 0) {
+ while (remoteTaskRunner.getNumAvailableWorkers() == 0) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Still can't find worker!");
@@ -367,27 +366,6 @@ public class RemoteTaskRunnerTest
}
}
- private static class TestScalingStrategy implements ScalingStrategy
- {
- @Override
- public AutoScalingData provision()
- {
- return null;
- }
-
- @Override
- public AutoScalingData terminate(List nodeIds)
- {
- return null;
- }
-
- @Override
- public List ipLookup(List ips)
- {
- return ips;
- }
- }
-
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
@@ -444,41 +422,4 @@ public class RemoteTaskRunnerTest
return 1000;
}
}
-
- @JsonTypeName("test")
- private static class TestTask extends DefaultMergeTask
- {
- private final String id;
-
- public TestTask(
- @JsonProperty("id") String id,
- @JsonProperty("dataSource") String dataSource,
- @JsonProperty("segments") List segments,
- @JsonProperty("aggregations") List aggregators
- )
- {
- super(dataSource, segments, aggregators);
-
- this.id = id;
- }
-
- @Override
- @JsonProperty
- public String getId()
- {
- return id;
- }
-
- @Override
- public Type getType()
- {
- return Type.TEST;
- }
-
- @Override
- public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
- {
- return TaskStatus.success("task1");
- }
- }
}
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java
new file mode 100644
index 00000000000..10398da005f
--- /dev/null
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java
@@ -0,0 +1,142 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.metamx.druid.aggregation.AggregatorFactory;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.merger.TestTask;
+import com.metamx.druid.merger.common.task.Task;
+import com.metamx.druid.merger.coordinator.WorkerWrapper;
+import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ */
+public class SimpleResourceManagementStrategyTest
+{
+ private AutoScalingStrategy autoScalingStrategy;
+ private WorkerSetupManager workerSetupManager;
+ private Task testTask;
+ private SimpleResourceManagementStrategy simpleResourceManagementStrategy;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
+ autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
+
+ testTask = new TestTask(
+ "task1",
+ "dummyDs",
+ Lists.newArrayList(
+ new DataSegment(
+ "dummyDs",
+ new Interval(new DateTime(), new DateTime()),
+ new DateTime().toString(),
+ null,
+ null,
+ null,
+ null,
+ 0
+ )
+ ), Lists.newArrayList()
+ );
+ simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
+ new TestAutoScalingStrategy(),
+ new SimpleResourceManagmentConfig()
+ {
+ @Override
+ public int getMaxWorkerIdleTimeMillisBeforeDeletion()
+ {
+ return 0;
+ }
+
+ @Override
+ public Duration getMaxScalingDuration()
+ {
+ return null;
+ }
+
+ @Override
+ public int getNumEventsToTrack()
+ {
+ return 1;
+ }
+ },
+ workerSetupManager
+ );
+ }
+
+ @Test
+ public void testSuccessfulProvision() throws Exception
+ {
+ EasyMock.expect(autoScalingStrategy.provision()).andReturn(
+ new AutoScalingData(Lists.newArrayList(), Lists.newArrayList())
+ );
+ EasyMock.replay(autoScalingStrategy);
+
+ simpleResourceManagementStrategy.doProvision(
+ Arrays.asList(
+ testTask
+ ),
+ Arrays.asList(
+ new TestWorkerWrapper(testTask)
+ )
+ );
+
+ EasyMock.verify(autoScalingStrategy);
+ }
+
+ @Test
+ public void testDoTerminate() throws Exception
+ {
+
+ }
+
+ private static class TestWorkerWrapper extends WorkerWrapper
+ {
+ private final Task testTask;
+
+ private TestWorkerWrapper(
+ Task testTask
+ )
+ {
+ super(null, null, null);
+
+ this.testTask = testTask;
+ }
+
+ @Override
+ public Set getRunningTasks()
+ {
+ return Sets.newHashSet(testTask.getId());
+ }
+ }
+}
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java
new file mode 100644
index 00000000000..8213da61848
--- /dev/null
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/TestAutoScalingStrategy.java
@@ -0,0 +1,45 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.merger.coordinator.scaling;
+
+import java.util.List;
+
+/**
+ */
+public class TestAutoScalingStrategy implements AutoScalingStrategy
+{
+ @Override
+ public AutoScalingData provision()
+ {
+ return null;
+ }
+
+ @Override
+ public AutoScalingData terminate(List ids)
+ {
+ return null;
+ }
+
+ @Override
+ public List ipToIdLookup(List ips)
+ {
+ return null;
+ }
+}