Autoscaling: Move target count independent of actual count.

This should let us grow and shrink the worker pool in chunks when necessary
(like when a bunch of them go offline, or when there is a worker version
change).
This commit is contained in:
Gian Merlino 2013-12-19 16:11:30 -08:00
parent fba6caf7fd
commit 1f4b99634f
6 changed files with 244 additions and 194 deletions

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -251,26 +252,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@Override @Override
public Collection<ZkWorker> getWorkers() public Collection<ZkWorker> getWorkers()
{ {
return zkWorkers.values(); return ImmutableList.copyOf(zkWorkers.values());
} }
@Override @Override
public Collection<RemoteTaskRunnerWorkItem> getRunningTasks() public Collection<RemoteTaskRunnerWorkItem> getRunningTasks()
{ {
return runningTasks.values(); return ImmutableList.copyOf(runningTasks.values());
} }
@Override @Override
public Collection<RemoteTaskRunnerWorkItem> getPendingTasks() public Collection<RemoteTaskRunnerWorkItem> getPendingTasks()
{ {
return pendingTasks.values(); return ImmutableList.copyOf(pendingTasks.values());
} }
@Override @Override
public Collection<RemoteTaskRunnerWorkItem> getKnownTasks() public Collection<RemoteTaskRunnerWorkItem> getKnownTasks()
{ {
// Racey, since there is a period of time during assignment when a task is neither pending nor running // Racey, since there is a period of time during assignment when a task is neither pending nor running
return Lists.newArrayList(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values())); return ImmutableList.copyOf(Iterables.concat(pendingTasks.values(), runningTasks.values(), completeTasks.values()));
} }
public ZkWorker findWorkerRunningTask(String taskId) public ZkWorker findWorkerRunningTask(String taskId)

View File

@ -28,12 +28,10 @@ import java.util.List;
public class AutoScalingData public class AutoScalingData
{ {
private final List<String> nodeIds; private final List<String> nodeIds;
private final List nodes;
public AutoScalingData(List<String> nodeIds, List nodes) public AutoScalingData(List<String> nodeIds)
{ {
this.nodeIds = nodeIds; this.nodeIds = nodeIds;
this.nodes = nodes;
} }
@JsonProperty @JsonProperty
@ -42,17 +40,11 @@ public class AutoScalingData
return nodeIds; return nodeIds;
} }
public List getNodes()
{
return nodes;
}
@Override @Override
public String toString() public String toString()
{ {
return "AutoScalingData{" + return "AutoScalingData{" +
"nodeIds=" + nodeIds + "nodeIds=" + nodeIds +
", nodes=" + nodes +
'}'; '}';
} }
} }

View File

@ -125,8 +125,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return input.getInstanceId(); return input.getInstanceId();
} }
} }
), )
result.getReservation().getInstances()
); );
} }
catch (Exception e) { catch (Exception e) {
@ -140,7 +139,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
public AutoScalingData terminate(List<String> ips) public AutoScalingData terminate(List<String> ips)
{ {
if (ips.isEmpty()) { if (ips.isEmpty()) {
return new AutoScalingData(Lists.<String>newArrayList(), Lists.<Instance>newArrayList()); return new AutoScalingData(Lists.<String>newArrayList());
} }
DescribeInstancesResult result = amazonEC2Client.describeInstances( DescribeInstancesResult result = amazonEC2Client.describeInstances(
@ -184,8 +183,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy
return String.format("%s:%s", input, config.getWorkerPort()); return String.format("%s:%s", input, config.getWorkerPort());
} }
} }
), )
instances
); );
} }
catch (Exception e) { catch (Exception e) {

View File

@ -19,14 +19,17 @@
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.scaling;
import com.google.api.client.repackaged.com.google.common.base.Joiner;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskRunnerWorkItem; import io.druid.indexing.overlord.TaskRunnerWorkItem;
@ -38,7 +41,6 @@ import org.joda.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
/** /**
*/ */
@ -48,211 +50,188 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private final AutoScalingStrategy autoScalingStrategy; private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagementConfig config; private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupdDataRef; private final Supplier<WorkerSetupData> workerSetupDataRef;
private final ScalingStats scalingStats; private final ScalingStats scalingStats;
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>(); private final Object lock = new Object();
private final ConcurrentSkipListSet<String> currentlyTerminating = new ConcurrentSkipListSet<String>(); private final Set<String> currentlyProvisioning = Sets.newHashSet();
private final Set<String> currentlyTerminating = Sets.newHashSet();
private final Predicate<ZkWorker> isLazyWorker = new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
}
};
private volatile DateTime lastProvisionTime = new DateTime(); private int targetWorkerCount = -1;
private volatile DateTime lastTerminateTime = new DateTime(); private DateTime lastProvisionTime = new DateTime();
private DateTime lastTerminateTime = new DateTime();
@Inject @Inject
public SimpleResourceManagementStrategy( public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy, AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagementConfig config, SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupdDataRef Supplier<WorkerSetupData> workerSetupDataRef
) )
{ {
this.autoScalingStrategy = autoScalingStrategy; this.autoScalingStrategy = autoScalingStrategy;
this.config = config; this.config = config;
this.workerSetupdDataRef = workerSetupdDataRef; this.workerSetupDataRef = workerSetupDataRef;
this.scalingStats = new ScalingStats(config.getNumEventsToTrack()); this.scalingStats = new ScalingStats(config.getNumEventsToTrack());
} }
@Override @Override
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
final WorkerSetupData workerSetupData = workerSetupdDataRef.get(); synchronized (lock) {
boolean didProvision = false;
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(workerSetupData);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
final String minVersion = workerSetupData.getMinVersion() == null final List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
? config.getWorkerVersion() Lists.newArrayList(
: workerSetupData.getMinVersion(); Iterables.<ZkWorker, String>transform(
int maxNumWorkers = workerSetupData.getMaxNumWorkers(); zkWorkers,
new Function<ZkWorker, String>()
int currValidWorkers = 0;
for (ZkWorker zkWorker : zkWorkers) {
if (zkWorker.isValidVersion(minVersion)) {
currValidWorkers++;
}
}
if (currValidWorkers >= maxNumWorkers) {
log.debug(
"Cannot scale anymore. Num workers = %d, Max num workers = %d",
zkWorkers.size(),
workerSetupdDataRef.get().getMaxNumWorkers()
);
return false;
}
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.<ZkWorker, String>transform(
zkWorkers,
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
{ {
return input.getWorker().getIp(); @Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
} }
} )
) )
) );
); currentlyProvisioning.removeAll(workerNodeIds);
currentlyProvisioning.removeAll(workerNodeIds); updateTargetWorkerCount(pendingTasks, zkWorkers);
boolean nothingProvisioning = currentlyProvisioning.isEmpty();
if (nothingProvisioning) { if (currentlyProvisioning.isEmpty()) {
if (hasTaskPendingBeyondThreshold(pendingTasks)) { int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
AutoScalingData provisioned = autoScalingStrategy.provision(); while (want > 0) {
final AutoScalingData provisioned = autoScalingStrategy.provision();
if (provisioned == null) {
break;
} else {
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned);
want -= provisioned.getNodeIds().size();
didProvision = true;
}
}
} else {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
if (provisioned != null) { log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
currentlyProvisioning.addAll(provisioned.getNodeIds());
lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned);
return true; if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
autoScalingStrategy.terminate(nodeIps);
currentlyProvisioning.clear();
} }
} }
} else {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info( return didProvision;
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s",
currentlyProvisioning,
durSinceLastProvision
);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
.emit();
List<String> nodeIps = autoScalingStrategy.idToIpLookup(Lists.newArrayList(currentlyProvisioning));
autoScalingStrategy.terminate(nodeIps);
currentlyProvisioning.clear();
}
} }
return false;
} }
@Override @Override
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
Set<String> workerNodeIds = Sets.newHashSet( synchronized (lock) {
autoScalingStrategy.ipToIdLookup( boolean didTerminate = false;
Lists.newArrayList( final Set<String> workerNodeIds = Sets.newHashSet(
Iterables.transform( autoScalingStrategy.ipToIdLookup(
zkWorkers, Lists.newArrayList(
new Function<ZkWorker, String>() Iterables.transform(
{ zkWorkers,
@Override new Function<ZkWorker, String>()
public String apply(ZkWorker input)
{ {
return input.getWorker().getIp(); @Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
} }
} )
)
)
)
);
Set<String> stillExisting = Sets.newHashSet();
for (String s : currentlyTerminating) {
if (workerNodeIds.contains(s)) {
stillExisting.add(s);
}
}
currentlyTerminating.clear();
currentlyTerminating.addAll(stillExisting);
boolean nothingTerminating = currentlyTerminating.isEmpty();
if (nothingTerminating) {
final int minNumWorkers = workerSetupdDataRef.get().getMinNumWorkers();
if (zkWorkers.size() <= minNumWorkers) {
log.info("Only [%d <= %d] nodes in the cluster, not terminating anything.", zkWorkers.size(), minNumWorkers);
return false;
}
List<ZkWorker> thoseLazyWorkers = Lists.newArrayList(
FunctionalIterable
.create(zkWorkers)
.filter(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
>= config.getWorkerIdleTimeout().toStandardDuration().getMillis();
}
}
) )
);
int maxPossibleNodesTerminated = zkWorkers.size() - minNumWorkers;
int numNodesToTerminate = Math.min(maxPossibleNodesTerminated, thoseLazyWorkers.size());
if (numNodesToTerminate <= 0) {
log.info("Found no nodes to terminate.");
return false;
}
AutoScalingData terminated = autoScalingStrategy.terminate(
Lists.transform(
thoseLazyWorkers.subList(0, numNodesToTerminate),
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker input)
{
return input.getWorker().getIp();
}
}
) )
); );
if (terminated != null) { final Set<String> stillExisting = Sets.newHashSet();
currentlyTerminating.addAll(terminated.getNodeIds()); for (String s : currentlyTerminating) {
lastTerminateTime = new DateTime(); if (workerNodeIds.contains(s)) {
scalingStats.addTerminateEvent(terminated); stillExisting.add(s);
}
return true;
} }
} else { currentlyTerminating.clear();
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); currentlyTerminating.addAll(stillExisting);
log.info( updateTargetWorkerCount(pendingTasks, zkWorkers);
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { if (currentlyTerminating.isEmpty()) {
log.makeAlert("Worker node termination taking too long!") final int want = zkWorkers.size() - targetWorkerCount;
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) if (want > 0) {
.addData("terminatingCount", currentlyTerminating.size()) final List<String> laziestWorkerIps =
.emit(); FluentIterable.from(zkWorkers)
.filter(isLazyWorker)
.limit(want)
.transform(
new Function<ZkWorker, String>()
{
@Override
public String apply(ZkWorker zkWorker)
{
return zkWorker.getWorker().getIp();
}
}
)
.toList();
currentlyTerminating.clear(); log.info(
"Terminating %,d workers (wanted %,d): %s",
laziestWorkerIps.size(),
want,
Joiner.on(", ").join(laziestWorkerIps)
);
final AutoScalingData terminated = autoScalingStrategy.terminate(laziestWorkerIps);
if (terminated != null) {
currentlyTerminating.addAll(terminated.getNodeIds());
lastTerminateTime = new DateTime();
scalingStats.addTerminateEvent(terminated);
didTerminate = true;
}
}
} else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
log.info("%s terminating. Current wait time: ", currentlyTerminating, durSinceLastTerminate);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
.emit();
currentlyTerminating.clear();
}
} }
return didTerminate;
} }
return false;
} }
@Override @Override
@ -261,16 +240,98 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats; return scalingStats;
} }
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks) private Predicate<ZkWorker> createValidWorkerPredicate(final WorkerSetupData workerSetupData)
{ {
long now = System.currentTimeMillis(); return new Predicate<ZkWorker>()
for (TaskRunnerWorkItem pendingTask : pendingTasks) { {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now); @Override
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration(); public boolean apply(ZkWorker zkWorker)
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) { {
return true; final String minVersion = workerSetupData.getMinVersion() != null
? workerSetupData.getMinVersion()
: config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
return zkWorker.isValidVersion(minVersion);
}
};
}
private void updateTargetWorkerCount(
final Collection<RemoteTaskRunnerWorkItem> pendingTasks,
final Collection<ZkWorker> zkWorkers
)
{
synchronized (lock) {
final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (targetWorkerCount < 0) {
// Initialize to size of current worker pool
targetWorkerCount = zkWorkers.size();
log.info(
"Starting with %,d workers (min = %,d, max = %,d).",
targetWorkerCount,
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
}
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(workerSetupData)
);
final boolean atSteadyState = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty()
&& validWorkers.size() == targetWorkerCount;
final boolean shouldScaleUp = atSteadyState
&& hasTaskPendingBeyondThreshold(pendingTasks)
&& targetWorkerCount < workerSetupData.getMaxNumWorkers();
final boolean shouldScaleDown = atSteadyState
&& Iterables.any(validWorkers, isLazyWorker)
&& targetWorkerCount > workerSetupData.getMinNumWorkers();
if (shouldScaleUp) {
targetWorkerCount++;
log.info(
"I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} else if (shouldScaleDown) {
targetWorkerCount--;
log.info(
"I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} else {
log.info(
"Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).",
targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers()
);
} }
} }
return false; }
private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
{
synchronized (lock) {
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
return true;
}
}
return false;
}
} }
} }

View File

@ -126,13 +126,11 @@ public class EC2AutoScalingStrategyTest
AutoScalingData created = strategy.provision(); AutoScalingData created = strategy.provision();
Assert.assertEquals(created.getNodeIds().size(), 1); Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals("theInstance", created.getNodeIds().get(0)); Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP")); AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1); Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(deleted.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0)); Assert.assertEquals(String.format("%s:8080", IP), deleted.getNodeIds().get(0));
} }
} }

View File

@ -105,7 +105,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList()) new AutoScalingData(Lists.<String>newArrayList("aNode"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -133,7 +133,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()).times(2); .andReturn(Lists.<String>newArrayList()).times(2);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -190,7 +190,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject()))
.andReturn(null); .andReturn(null);
EasyMock.expect(autoScalingStrategy.provision()).andReturn( EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("fake"), Lists.newArrayList("faker")) new AutoScalingData(Lists.<String>newArrayList("fake"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -242,7 +242,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList()); .andReturn(Lists.<String>newArrayList());
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList(), Lists.newArrayList()) new AutoScalingData(Lists.<String>newArrayList())
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
@ -272,7 +272,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject())) EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2); .andReturn(Lists.<String>newArrayList("ip")).times(2);
EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn( EasyMock.expect(autoScalingStrategy.terminate(EasyMock.<List<String>>anyObject())).andReturn(
new AutoScalingData(Lists.<String>newArrayList("ip"), Lists.newArrayList("ip")) new AutoScalingData(Lists.<String>newArrayList("ip"))
); );
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);