Autoscaling changes from code review.

- Log and return immediately when workerSetupData is null
- Allow provisioning more nodes while other nodes are still provisioning
- Add tests for bumping up the minimum version
This commit is contained in:
Gian Merlino 2013-12-20 08:59:35 -08:00
parent 0ee6136ea3
commit 4a722c0a6d
2 changed files with 195 additions and 23 deletions

View File

@ -90,6 +90,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
synchronized (lock) { synchronized (lock) {
boolean didProvision = false; boolean didProvision = false;
final WorkerSetupData workerSetupData = workerSetupDataRef.get(); final WorkerSetupData workerSetupData = workerSetupDataRef.get();
if (workerSetupData == null) {
log.warn("No workerSetupData available, cannot provision new workers.");
return false;
}
final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(workerSetupData); final Predicate<ZkWorker> isValidWorker = createValidWorkerPredicate(workerSetupData);
final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size();
@ -112,21 +116,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
updateTargetWorkerCount(pendingTasks, zkWorkers); updateTargetWorkerCount(pendingTasks, zkWorkers);
if (currentlyProvisioning.isEmpty()) { int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size());
int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size()); while (want > 0) {
while (want > 0) { final AutoScalingData provisioned = autoScalingStrategy.provision();
final AutoScalingData provisioned = autoScalingStrategy.provision(); final List<String> newNodes;
if (provisioned == null) { if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) {
break; break;
} else { } else {
currentlyProvisioning.addAll(provisioned.getNodeIds()); currentlyProvisioning.addAll(newNodes);
lastProvisionTime = new DateTime(); lastProvisionTime = new DateTime();
scalingStats.addProvisionEvent(provisioned); scalingStats.addProvisionEvent(provisioned);
want -= provisioned.getNodeIds().size(); want -= provisioned.getNodeIds().size();
didProvision = true; didProvision = true;
}
} }
} else { }
if (!currentlyProvisioning.isEmpty()) {
Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime());
log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision); log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision);
@ -151,6 +156,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
synchronized (lock) { synchronized (lock) {
if (workerSetupDataRef.get() == null) {
log.warn("No workerSetupData available, cannot terminate workers.");
return false;
}
boolean didTerminate = false; boolean didTerminate = false;
final Set<String> workerNodeIds = Sets.newHashSet( final Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup( autoScalingStrategy.ipToIdLookup(
@ -218,7 +228,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} else { } else {
Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime());
log.info("%s terminating. Current wait time: ", currentlyTerminating, durSinceLastTerminate); log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!") log.makeAlert("Worker node termination taking too long!")
@ -265,22 +275,23 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{ {
synchronized (lock) { synchronized (lock) {
final WorkerSetupData workerSetupData = workerSetupDataRef.get(); final WorkerSetupData workerSetupData = workerSetupDataRef.get();
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(workerSetupData)
);
if (targetWorkerCount < 0) { if (targetWorkerCount < 0) {
// Initialize to size of current worker pool // Initialize to size of current worker pool
targetWorkerCount = zkWorkers.size(); targetWorkerCount = zkWorkers.size();
log.info( log.info(
"Starting with %,d workers (min = %,d, max = %,d).", "Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).",
targetWorkerCount, targetWorkerCount,
validWorkers.size(),
workerSetupData.getMinNumWorkers(), workerSetupData.getMinNumWorkers(),
workerSetupData.getMaxNumWorkers() workerSetupData.getMaxNumWorkers()
); );
} }
final Collection<ZkWorker> validWorkers = Collections2.filter(
zkWorkers,
createValidWorkerPredicate(workerSetupData)
);
final boolean atSteadyState = currentlyProvisioning.isEmpty() final boolean atSteadyState = currentlyProvisioning.isEmpty()
&& currentlyTerminating.isEmpty() && currentlyTerminating.isEmpty()
&& validWorkers.size() == targetWorkerCount; && validWorkers.size() == targetWorkerCount;

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord.scaling; package io.druid.indexing.overlord.scaling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -28,6 +29,7 @@ import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import io.druid.indexing.overlord.ZkWorker; import io.druid.indexing.overlord.ZkWorker;
@ -63,7 +65,7 @@ public class SimpleResourceManagementStrategyTest
public void setUp() throws Exception public void setUp() throws Exception
{ {
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<WorkerSetupData>( workerSetupData = new AtomicReference<>(
new WorkerSetupData( new WorkerSetupData(
"0", 0, 2, null, null, null "0", 0, 2, null, null, null
) )
@ -309,15 +311,174 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy); EasyMock.verify(autoScalingStrategy);
} }
@Test
public void testNoActionNeeded() throws Exception
{
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testMinVersionIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minVersion
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
// Provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Terminate old workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
ImmutableList.of("h1", "h2", "h3", "h4")
);
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
new AutoScalingData(ImmutableList.of("h1", "h2"))
);
EasyMock.replay(autoScalingStrategy);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(null, "h1", "i1", "0"),
new TestZkWorker(null, "h2", "i2", "0"),
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
)
);
Assert.assertTrue(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNullWorkerSetupData() throws Exception
{
workerSetupData.set(null);
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(
new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime())
),
Arrays.<ZkWorker>asList(
new TestZkWorker(null)
)
);
Assert.assertFalse(terminatedSomething);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
}
private static class TestZkWorker extends ZkWorker private static class TestZkWorker extends ZkWorker
{ {
private final Task testTask; private final Task testTask;
private TestZkWorker( public TestZkWorker(
Task testTask Task testTask
) )
{ {
super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper()); this(testTask, "host", "ip", "0");
}
public TestZkWorker(
Task testTask,
String host,
String ip,
String version
)
{
super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper());
this.testTask = testTask; this.testTask = testTask;
} }