fix bug with provision

This commit is contained in:
Fangjin Yang 2012-10-31 14:45:44 -07:00
parent 11c64593ae
commit 9547ea494d
5 changed files with 38 additions and 60 deletions

View File

@ -439,7 +439,7 @@ public class RemoteTaskRunner implements TaskRunner
if (workerQueue.isEmpty()) { if (workerQueue.isEmpty()) {
log.makeAlert("There are no worker nodes with capacity to run task!").emit(); log.makeAlert("There are no worker nodes with capacity to run task!").emit();
strategy.provisionIfNeeded(zkWorkers); strategy.provision(zkWorkers);
return null; return null;
} }

View File

@ -9,6 +9,7 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.WorkerWrapper; import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map; import java.util.Map;
@ -35,7 +36,7 @@ public class NoopScalingStrategy implements ScalingStrategy
} }
@Override @Override
public void provisionIfNeeded(Map<String, WorkerWrapper> zkWorkers) public void provision(Map<String, WorkerWrapper> zkWorkers)
{ {
synchronized (lock) { synchronized (lock) {
if (currentlyProvisioning != null) { if (currentlyProvisioning != null) {
@ -48,18 +49,6 @@ public class NoopScalingStrategy implements ScalingStrategy
} }
} }
Iterable<WorkerWrapper> availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return !input.isAtCapacity();
}
}
);
if (Iterables.size(availableWorkers) == 0) {
try { try {
log.info("If I were a real strategy I'd create something now"); log.info("If I were a real strategy I'd create something now");
currentlyProvisioning = "willNeverBeTrue"; currentlyProvisioning = "willNeverBeTrue";
@ -70,7 +59,6 @@ public class NoopScalingStrategy implements ScalingStrategy
} }
} }
} }
}
@Override @Override
public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers) public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers)
@ -89,9 +77,9 @@ public class NoopScalingStrategy implements ScalingStrategy
@Override @Override
public int compare(WorkerWrapper w1, WorkerWrapper w2) public int compare(WorkerWrapper w1, WorkerWrapper w2)
{ {
return Ordering.natural() DateTime w1Time = (w1 == null) ? new DateTime(0) : w1.getLastCompletedTaskTime();
.nullsFirst() DateTime w2Time = (w2 == null) ? new DateTime(0) : w2.getLastCompletedTaskTime();
.compare(w1.getLastCompletedTaskTime(), w2.getLastCompletedTaskTime()); return w1Time.compareTo(w2Time);
} }
} }
).create( ).create(

View File

@ -36,6 +36,7 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.merger.coordinator.WorkerWrapper; import com.metamx.druid.merger.coordinator.WorkerWrapper;
import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig; import com.metamx.druid.merger.coordinator.config.S3AutoScalingStrategyConfig;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
@ -65,7 +66,7 @@ public class S3AutoScalingStrategy implements ScalingStrategy
} }
@Override @Override
public void provisionIfNeeded(Map<String, WorkerWrapper> zkWorkers) public void provision(Map<String, WorkerWrapper> zkWorkers)
{ {
synchronized (lock) { synchronized (lock) {
if (zkWorkers.containsKey(currentlyProvisioning)) { if (zkWorkers.containsKey(currentlyProvisioning)) {
@ -80,18 +81,6 @@ public class S3AutoScalingStrategy implements ScalingStrategy
return; return;
} }
Iterable<WorkerWrapper> availableWorkers = FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(WorkerWrapper input)
{
return !input.isAtCapacity();
}
}
);
if (Iterables.size(availableWorkers) == 0) {
try { try {
log.info("Creating a new instance"); log.info("Creating a new instance");
RunInstancesResult result = amazonEC2Client.runInstances( RunInstancesResult result = amazonEC2Client.runInstances(
@ -115,7 +104,6 @@ public class S3AutoScalingStrategy implements ScalingStrategy
} }
} }
} }
}
@Override @Override
public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers) public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers)
@ -136,7 +124,9 @@ public class S3AutoScalingStrategy implements ScalingStrategy
@Override @Override
public int compare(WorkerWrapper w1, WorkerWrapper w2) public int compare(WorkerWrapper w1, WorkerWrapper w2)
{ {
return w1.getLastCompletedTaskTime().compareTo(w2.getLastCompletedTaskTime()); DateTime w1Time = (w1 == null) ? new DateTime(0) : w1.getLastCompletedTaskTime();
DateTime w2Time = (w2 == null) ? new DateTime(0) : w2.getLastCompletedTaskTime();
return w1Time.compareTo(w2Time);
} }
} }
).create( ).create(

View File

@ -28,7 +28,7 @@ import java.util.Map;
*/ */
public interface ScalingStrategy public interface ScalingStrategy
{ {
public void provisionIfNeeded(Map<String, WorkerWrapper> zkWorkers); public void provision(Map<String, WorkerWrapper> zkWorkers);
public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers); public Instance terminateIfNeeded(Map<String, WorkerWrapper> zkWorkers);
} }

View File

@ -152,7 +152,7 @@ public class S3AutoScalingStrategyTest
Assert.assertTrue(worker.isAtCapacity()); Assert.assertTrue(worker.isAtCapacity());
strategy.provisionIfNeeded(zkWorkers); strategy.provision(zkWorkers);
worker.getRunningTasks().remove("task1"); worker.getRunningTasks().remove("task1");
worker.getRunningTasks().remove("task2"); worker.getRunningTasks().remove("task2");