diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 5ed630e607b..5e8b31078af 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -439,7 +439,7 @@ public class RemoteTaskRunner implements TaskRunner if (workerQueue.isEmpty()) { log.makeAlert("There are no worker nodes with capacity to run task!").emit(); - strategy.provisionIfNeeded(zkWorkers); + strategy.provision(zkWorkers); return null; } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java index d878182b30c..81f9550a24e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/NoopScalingStrategy.java @@ -9,6 +9,7 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.merger.coordinator.WorkerWrapper; import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig; import com.metamx.emitter.EmittingLogger; +import org.joda.time.DateTime; import java.util.Comparator; import java.util.Map; @@ -35,7 +36,7 @@ public class NoopScalingStrategy implements ScalingStrategy } @Override - public void provisionIfNeeded(Map zkWorkers) + public void provision(Map zkWorkers) { synchronized (lock) { if (currentlyProvisioning != null) { @@ -48,26 +49,13 @@ public class NoopScalingStrategy implements ScalingStrategy } } - Iterable availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter( - new Predicate() - { - @Override - public boolean apply(WorkerWrapper input) - { - return !input.isAtCapacity(); - } - } - ); - - if (Iterables.size(availableWorkers) == 0) { - try { - log.info("If I were a real strategy I'd create something now"); - currentlyProvisioning = "willNeverBeTrue"; - } - catch (Exception e) { - log.error(e, "Unable to create instance"); - currentlyProvisioning = null; - } + try { + log.info("If I were a real strategy I'd create something now"); + currentlyProvisioning = "willNeverBeTrue"; + } + catch (Exception e) { + log.error(e, "Unable to create instance"); + currentlyProvisioning = null; } } } @@ -89,9 +77,9 @@ public class NoopScalingStrategy implements ScalingStrategy @Override public int compare(WorkerWrapper w1, WorkerWrapper w2) { - return Ordering.natural() - .nullsFirst() - .compare(w1.getLastCompletedTaskTime(), w2.getLastCompletedTaskTime()); + DateTime w1Time = (w1 == null) ? new DateTime(0) : w1.getLastCompletedTaskTime(); + DateTime w2Time = (w2 == null) ? new DateTime(0) : w2.getLastCompletedTaskTime(); + return w1Time.compareTo(w2Time); } } ).create( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java index de00b7922a4..840dba3a2a8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategy.java @@ -36,6 +36,7 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.merger.coordinator.WorkerWrapper; import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig; import com.metamx.emitter.EmittingLogger; +import org.joda.time.DateTime; import java.util.Arrays; import java.util.Comparator; @@ -65,7 +66,7 @@ public class S3AutoScalingStrategy implements ScalingStrategy } @Override - public void provisionIfNeeded(Map zkWorkers) + public void provision(Map zkWorkers) { synchronized (lock) { if (zkWorkers.containsKey(currentlyProvisioning)) { @@ -80,39 +81,26 @@ public class S3AutoScalingStrategy implements ScalingStrategy return; } - Iterable availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter( - new Predicate() - { - @Override - public boolean apply(WorkerWrapper input) - { - return !input.isAtCapacity(); - } - } - ); + try { + log.info("Creating a new instance"); + RunInstancesResult result = amazonEC2Client.runInstances( + new RunInstancesRequest(config.getAmiId(), 1, 1) + .withInstanceType(InstanceType.fromValue(config.getInstanceType())) + ); - if (Iterables.size(availableWorkers) == 0) { - try { - log.info("Creating a new instance"); - RunInstancesResult result = amazonEC2Client.runInstances( - new RunInstancesRequest(config.getAmiId(), 1, 1) - .withInstanceType(InstanceType.fromValue(config.getInstanceType())) - ); - - if (result.getReservation().getInstances().size() != 1) { - throw new ISE("Created more than one instance, WTF?!"); - } - - Instance instance = result.getReservation().getInstances().get(0); - log.info("Created instance: %s", instance.getInstanceId()); - log.debug("%s", instance); - - currentlyProvisioning = String.format("%s:%s", instance.getPrivateIpAddress(), config.getWorkerPort()); - } - catch (Exception e) { - log.error(e, "Unable to create instance"); - currentlyProvisioning = null; + if (result.getReservation().getInstances().size() != 1) { + throw new ISE("Created more than one instance, WTF?!"); } + + Instance instance = result.getReservation().getInstances().get(0); + log.info("Created instance: %s", instance.getInstanceId()); + log.debug("%s", instance); + + currentlyProvisioning = String.format("%s:%s", instance.getPrivateIpAddress(), config.getWorkerPort()); + } + catch (Exception e) { + log.error(e, "Unable to create instance"); + currentlyProvisioning = null; } } } @@ -136,7 +124,9 @@ public class S3AutoScalingStrategy implements ScalingStrategy @Override public int compare(WorkerWrapper w1, WorkerWrapper w2) { - return w1.getLastCompletedTaskTime().compareTo(w2.getLastCompletedTaskTime()); + DateTime w1Time = (w1 == null) ? new DateTime(0) : w1.getLastCompletedTaskTime(); + DateTime w2Time = (w2 == null) ? new DateTime(0) : w2.getLastCompletedTaskTime(); + return w1Time.compareTo(w2Time); } } ).create( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java index 7aba31b0c25..6a779927b87 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStrategy.java @@ -28,7 +28,7 @@ import java.util.Map; */ public interface ScalingStrategy { - public void provisionIfNeeded(Map zkWorkers); + public void provision(Map zkWorkers); public Instance terminateIfNeeded(Map zkWorkers); } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java index 0aab85d9cf1..de486cbee57 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/S3AutoScalingStrategyTest.java @@ -152,7 +152,7 @@ public class S3AutoScalingStrategyTest Assert.assertTrue(worker.isAtCapacity()); - strategy.provisionIfNeeded(zkWorkers); + strategy.provision(zkWorkers); worker.getRunningTasks().remove("task1"); worker.getRunningTasks().remove("task2");