From 4a722c0a6d14cb172835c6f862d8c9b2c9053046 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 20 Dec 2013 08:59:35 -0800 Subject: [PATCH] Autoscaling changes from code review. - Log and return immediately when workerSetupData is null - Allow provisioning more nodes while other nodes are still provisioning - Add tests for bumping up the minimum version --- .../SimpleResourceManagementStrategy.java | 51 +++--- .../SimpleResourceManagementStrategyTest.java | 167 +++++++++++++++++- 2 files changed, 195 insertions(+), 23 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java index 68733fed9cf..4f541f113c7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategy.java @@ -90,6 +90,10 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat synchronized (lock) { boolean didProvision = false; final WorkerSetupData workerSetupData = workerSetupDataRef.get(); + if (workerSetupData == null) { + log.warn("No workerSetupData available, cannot provision new workers."); + return false; + } final Predicate isValidWorker = createValidWorkerPredicate(workerSetupData); final int currValidWorkers = Collections2.filter(zkWorkers, isValidWorker).size(); @@ -112,21 +116,22 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat updateTargetWorkerCount(pendingTasks, zkWorkers); - if (currentlyProvisioning.isEmpty()) { - int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size()); - while (want > 0) { - final AutoScalingData provisioned = autoScalingStrategy.provision(); - if (provisioned == null) { - break; - } else { - currentlyProvisioning.addAll(provisioned.getNodeIds()); - lastProvisionTime = new DateTime(); - scalingStats.addProvisionEvent(provisioned); - want -= provisioned.getNodeIds().size(); - didProvision = true; - } + int want = targetWorkerCount - (currValidWorkers + currentlyProvisioning.size()); + while (want > 0) { + final AutoScalingData provisioned = autoScalingStrategy.provision(); + final List newNodes; + if (provisioned == null || (newNodes = provisioned.getNodeIds()).isEmpty()) { + break; + } else { + currentlyProvisioning.addAll(newNodes); + lastProvisionTime = new DateTime(); + scalingStats.addProvisionEvent(provisioned); + want -= provisioned.getNodeIds().size(); + didProvision = true; } - } else { + } + + if (!currentlyProvisioning.isEmpty()) { Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); log.info("%s provisioning. Current wait time: %s", currentlyProvisioning, durSinceLastProvision); @@ -151,6 +156,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat public boolean doTerminate(Collection pendingTasks, Collection zkWorkers) { synchronized (lock) { + if (workerSetupDataRef.get() == null) { + log.warn("No workerSetupData available, cannot terminate workers."); + return false; + } + boolean didTerminate = false; final Set workerNodeIds = Sets.newHashSet( autoScalingStrategy.ipToIdLookup( @@ -218,7 +228,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } else { Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); - log.info("%s terminating. Current wait time: ", currentlyTerminating, durSinceLastTerminate); + log.info("%s terminating. Current wait time: %s", currentlyTerminating, durSinceLastTerminate); if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { log.makeAlert("Worker node termination taking too long!") @@ -265,22 +275,23 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat { synchronized (lock) { final WorkerSetupData workerSetupData = workerSetupDataRef.get(); + final Collection validWorkers = Collections2.filter( + zkWorkers, + createValidWorkerPredicate(workerSetupData) + ); if (targetWorkerCount < 0) { // Initialize to size of current worker pool targetWorkerCount = zkWorkers.size(); log.info( - "Starting with %,d workers (min = %,d, max = %,d).", + "Starting with a target of %,d workers (current = %,d, min = %,d, max = %,d).", targetWorkerCount, + validWorkers.size(), workerSetupData.getMinNumWorkers(), workerSetupData.getMaxNumWorkers() ); } - final Collection validWorkers = Collections2.filter( - zkWorkers, - createValidWorkerPredicate(workerSetupData) - ); final boolean atSteadyState = currentlyProvisioning.isEmpty() && currentlyTerminating.isEmpty() && validWorkers.size() == targetWorkerCount; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java index 10c7bf77882..6ffc6ae6222 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/scaling/SimpleResourceManagementStrategyTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.overlord.scaling; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,6 +29,7 @@ import com.metamx.emitter.service.ServiceEventBuilder; import io.druid.common.guava.DSuppliers; import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import io.druid.indexing.overlord.ZkWorker; @@ -63,7 +65,7 @@ public class SimpleResourceManagementStrategyTest public void setUp() throws Exception { autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class); - workerSetupData = new AtomicReference( + workerSetupData = new AtomicReference<>( new WorkerSetupData( "0", 0, 2, null, null, null ) @@ -309,15 +311,174 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); } + @Test + public void testNoActionNeeded() throws Exception + { + EasyMock.reset(autoScalingStrategy); + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScalingStrategy); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(NoopTask.create()), + new TestZkWorker(NoopTask.create()) + ) + ); + + Assert.assertFalse(terminatedSomething); + EasyMock.verify(autoScalingStrategy); + + EasyMock.reset(autoScalingStrategy); + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScalingStrategy); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(NoopTask.create()), + new TestZkWorker(NoopTask.create()) + ) + ); + + Assert.assertFalse(provisionedSomething); + EasyMock.verify(autoScalingStrategy); + } + + @Test + public void testMinVersionIncrease() throws Exception + { + // Don't terminate anything + EasyMock.reset(autoScalingStrategy); + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScalingStrategy); + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( + Arrays.asList(), + Arrays.asList( + new TestZkWorker(NoopTask.create(), "h1", "i1", "0"), + new TestZkWorker(NoopTask.create(), "h1", "i2", "0") + ) + ); + Assert.assertFalse(terminatedSomething); + EasyMock.verify(autoScalingStrategy); + + // Don't provision anything + EasyMock.reset(autoScalingStrategy); + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.replay(autoScalingStrategy); + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList(), + Arrays.asList( + new TestZkWorker(NoopTask.create()), + new TestZkWorker(NoopTask.create()) + ) + ); + Assert.assertFalse(provisionedSomething); + EasyMock.verify(autoScalingStrategy); + + // Increase minVersion + workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null)); + + // Provision two new workers + EasyMock.reset(autoScalingStrategy); + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList("ip")); + EasyMock.expect(autoScalingStrategy.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("h3")) + ); + EasyMock.expect(autoScalingStrategy.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("h4")) + ); + EasyMock.replay(autoScalingStrategy); + provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList(), + Arrays.asList( + new TestZkWorker(NoopTask.create(), "h1", "i1", "0"), + new TestZkWorker(NoopTask.create(), "h2", "i2", "0") + ) + ); + Assert.assertTrue(provisionedSomething); + EasyMock.verify(autoScalingStrategy); + + // Terminate old workers + EasyMock.reset(autoScalingStrategy); + EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn( + ImmutableList.of("h1", "h2", "h3", "h4") + ); + EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn( + new AutoScalingData(ImmutableList.of("h1", "h2")) + ); + EasyMock.replay(autoScalingStrategy); + terminatedSomething = simpleResourceManagementStrategy.doTerminate( + Arrays.asList(), + Arrays.asList( + new TestZkWorker(null, "h1", "i1", "0"), + new TestZkWorker(null, "h2", "i2", "0"), + new TestZkWorker(NoopTask.create(), "h3", "i3", "1"), + new TestZkWorker(NoopTask.create(), "h4", "i4", "1") + ) + ); + Assert.assertTrue(terminatedSomething); + EasyMock.verify(autoScalingStrategy); + } + + @Test + public void testNullWorkerSetupData() throws Exception + { + workerSetupData.set(null); + EasyMock.replay(autoScalingStrategy); + + boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(null) + ) + ); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask.getId(), null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(null) + ) + ); + + Assert.assertFalse(terminatedSomething); + Assert.assertFalse(provisionedSomething); + + EasyMock.verify(autoScalingStrategy); + } + private static class TestZkWorker extends ZkWorker { private final Task testTask; - private TestZkWorker( + public TestZkWorker( Task testTask ) { - super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper()); + this(testTask, "host", "ip", "0"); + } + + public TestZkWorker( + Task testTask, + String host, + String ip, + String version + ) + { + super(new Worker(host, ip, 3, version), null, new DefaultObjectMapper()); this.testTask = testTask; }