SimpleResourceManagementStrategy: Scale up to minWorkerCount when increased

This commit is contained in:
Gian Merlino 2014-02-06 13:20:01 -08:00
parent 9756b18f9b
commit 5ec634e498
2 changed files with 78 additions and 16 deletions

View File

@ -297,22 +297,28 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
createValidWorkerPredicate(config, workerSetupData) createValidWorkerPredicate(config, workerSetupData)
); );
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData); final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config, workerSetupData);
final int minWorkerCount = workerSetupData.getMinNumWorkers();
final int maxWorkerCount = workerSetupData.getMaxNumWorkers();
if (minWorkerCount > maxWorkerCount) {
log.info("Huh? minWorkerCount[%d] > maxWorkerCount[%d]. I give up!", minWorkerCount, maxWorkerCount);
}
if (targetWorkerCount < 0) { if (targetWorkerCount < 0) {
// Initialize to size of current worker pool, subject to pool size limits // Initialize to size of current worker pool, subject to pool size limits
targetWorkerCount = Math.max( targetWorkerCount = Math.max(
Math.min( Math.min(
zkWorkers.size(), zkWorkers.size(),
workerSetupData.getMaxNumWorkers() maxWorkerCount
), ),
workerSetupData.getMinNumWorkers() minWorkerCount
); );
log.info( log.info(
"Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).", "Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount, targetWorkerCount,
validWorkers.size(), validWorkers.size(),
workerSetupData.getMinNumWorkers(), minWorkerCount,
workerSetupData.getMaxNumWorkers() maxWorkerCount
); );
} }
@ -320,36 +326,37 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
&& currentlyTerminating.isEmpty() && currentlyTerminating.isEmpty()
&& validWorkers.size() == targetWorkerCount; && validWorkers.size() == targetWorkerCount;
final boolean shouldScaleUp = atSteadyState final boolean shouldScaleUp = atSteadyState
&& hasTaskPendingBeyondThreshold(pendingTasks) && targetWorkerCount < maxWorkerCount
&& targetWorkerCount < workerSetupData.getMaxNumWorkers(); && (hasTaskPendingBeyondThreshold(pendingTasks)
|| targetWorkerCount < minWorkerCount);
final boolean shouldScaleDown = atSteadyState final boolean shouldScaleDown = atSteadyState
&& Iterables.any(validWorkers, isLazyWorker) && targetWorkerCount > minWorkerCount
&& targetWorkerCount > workerSetupData.getMinNumWorkers(); && Iterables.any(validWorkers, isLazyWorker);
if (shouldScaleUp) { if (shouldScaleUp) {
targetWorkerCount++; targetWorkerCount = Math.max(targetWorkerCount + 1, minWorkerCount);
log.info( log.info(
"I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).", "I think we should scale up to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount, targetWorkerCount,
validWorkers.size(), validWorkers.size(),
workerSetupData.getMinNumWorkers(), minWorkerCount,
workerSetupData.getMaxNumWorkers() maxWorkerCount
); );
} else if (shouldScaleDown) { } else if (shouldScaleDown) {
targetWorkerCount--; targetWorkerCount = Math.min(targetWorkerCount - 1, maxWorkerCount);
log.info( log.info(
"I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).", "I think we should scale down to %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount, targetWorkerCount,
validWorkers.size(), validWorkers.size(),
workerSetupData.getMinNumWorkers(), minWorkerCount,
workerSetupData.getMaxNumWorkers() maxWorkerCount
); );
} else { } else {
log.info( log.info(
"Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).", "Our target is %,d workers, and I'm okay with that (current = %,d, min = %,d, max = %,d).",
targetWorkerCount, targetWorkerCount,
validWorkers.size(), validWorkers.size(),
workerSetupData.getMinNumWorkers(), minWorkerCount,
workerSetupData.getMaxNumWorkers() maxWorkerCount
); );
} }
} }

View File

@ -349,6 +349,61 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScalingStrategy);
} }
@Test
public void testMinCountIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minNumWorkers
workerSetupData.set(new WorkerSetupData("0", 3, 5, null, null, null));
// Should provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test @Test
public void testMinVersionIncrease() throws Exception public void testMinVersionIncrease() throws Exception
{ {