diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f01079254dc..dd0a4f2c0c5 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1015,6 +1015,7 @@ There are additional configs for autoscaling (if it is enabled): |`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S| |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null| |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080| +|`druid.indexer.autoscale.workerCapacityHint`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. This value should typically be equal to `druid.worker.capacity` when you have a homogeneous cluster and the average of `druid.worker.capacity` across the workers when you have a heterogeneous cluster. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1| ##### Supervisors diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java index 3c8f5291501..e307b3217f7 100644 --- a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java +++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java @@ -80,8 +80,8 @@ public class GceAutoScaler implements AutoScaler @JsonProperty("envConfig") GceEnvironmentConfig envConfig ) { - Preconditions.checkArgument(minNumWorkers > 0, - "minNumWorkers must be greater than 0"); + Preconditions.checkArgument(minNumWorkers >= 0, + "minNumWorkers must be greater than or equal to 0"); this.minNumWorkers = minNumWorkers; Preconditions.checkArgument(maxNumWorkers > 0, "maxNumWorkers must be greater than 0"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java index 3ef70e93678..4ab98edc6d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java @@ -29,6 +29,8 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis @JsonProperty private int maxScalingStep = 10; + @JsonProperty + private int workerCapacityHint = -1; public int getMaxScalingStep() { @@ -76,4 +78,14 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis return this; } + public int getWorkerCapacityHint() + { + return workerCapacityHint; + } + + public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityHint(int workerCapacityHint) + { + this.workerCapacityHint = workerCapacityHint; + return this; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index cf562f1dfa3..28d1bef3369 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord.autoscaling; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; @@ -60,11 +61,14 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr { private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class); + public static final String ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET = "As minNumWorkers is set to 0, workerCapacityHint must be greater than 0. workerCapacityHint value set is %d"; private static final String SCHEME = "http"; + @VisibleForTesting @Nullable - static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig( + public static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig( Supplier workerConfigRef, + SimpleWorkerProvisioningConfig config, String action, EmittingLogger log ) @@ -87,6 +91,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr log.error("No autoScaler available, cannot %s workers", action); return null; } + if (config instanceof PendingTaskBasedWorkerProvisioningConfig + && workerConfig.getAutoScaler().getMinNumWorkers() == 0 + && ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint() <= 0 + ) { + log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint()); + return null; + } return workerConfig; } @@ -157,7 +168,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr Collection workers = runner.getWorkers(); log.debug("Workers: %d %s", workers.size(), workers); boolean didProvision = false; - final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log); + final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log); if (workerConfig == null) { return false; } @@ -246,14 +257,19 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount); final int currValidWorkers = getCurrValidWorkers(workers); - // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need - // since we are not aware of the expectedWorkerCapacity. - int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks( - remoteTaskRunnerConfig, - workerConfig, - pendingTasks, - workers - ); + // If there are no worker and workerCapacityHint config is not set (-1) or invalid (<= 0), then spin up minWorkerCount + // as we cannot determine the exact capacity here to fulfill the need. + // However, if there are no worker but workerCapacityHint config is set (>0), then we can + // determine the number of workers needed using workerCapacityHint config as expected worker capacity + int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityHint() <= 0 + ? minWorkerCount + : getWorkersNeededToAssignTasks( + remoteTaskRunnerConfig, + workerConfig, + pendingTasks, + workers, + config.getWorkerCapacityHint() + ); log.debug("More workers needed: %d", moreWorkersNeeded); int want = Math.max( @@ -280,7 +296,8 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr final WorkerTaskRunnerConfig workerTaskRunnerConfig, final DefaultWorkerBehaviorConfig workerConfig, final Collection pendingTasks, - final Collection workers + final Collection workers, + final int workerCapacityHint ) { final Collection validWorkers = Collections2.filter( @@ -295,7 +312,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr } WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); int need = 0; - int capacity = getExpectedWorkerCapacity(workers); + int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint); log.info("Expected worker capacity: %d", capacity); // Simulate assigning tasks to dummy workers using configured workerSelectStrategy @@ -333,7 +350,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr { Collection zkWorkers = runner.getWorkers(); log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers); - final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log); + final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log); if (workerConfig == null) { return false; } @@ -441,12 +458,18 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr return currValidWorkers; } - private static int getExpectedWorkerCapacity(final Collection workers) + private static int getExpectedWorkerCapacity(final Collection workers, final int workerCapacityHint) { int size = workers.size(); if (size == 0) { - // No existing workers assume capacity per worker as 1 - return 1; + // No existing workers + if (workerCapacityHint > 0) { + // Return workerCapacityHint if it is set in config + return workerCapacityHint; + } else { + // Assume capacity per worker as 1 + return 1; + } } else { // Assume all workers have same capacity return workers.iterator().next().getWorker().getCapacity(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java index a17014c29c5..afdaa57c565 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java @@ -121,7 +121,7 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning Collection workers = runner.getWorkers(); boolean didProvision = false; final DefaultWorkerBehaviorConfig workerConfig = - PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log); + PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log); if (workerConfig == null) { return false; } @@ -186,7 +186,7 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning { Collection pendingTasks = runner.getPendingTasks(); final DefaultWorkerBehaviorConfig workerConfig = - PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log); + PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log); if (workerConfig == null) { return false; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index 45207940668..34af0cadc56 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord.autoscaling; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.indexer.TaskLocation; @@ -43,6 +44,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.easymock.Capture; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -106,10 +108,35 @@ public class PendingTaskBasedProvisioningStrategyTest ); } + @Test + public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet() + { + EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class); + Capture capturedArgument = Capture.newInstance(); + mockLogger.error(EasyMock.capture(capturedArgument), EasyMock.anyInt()); + + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.replay(autoScaler, mockLogger); + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig( + DSuppliers.of(workerConfig), + config, + "test", + mockLogger + ); + Assert.assertNull(defaultWorkerBehaviorConfig); + Assert.assertEquals(PendingTaskBasedWorkerProvisioningStrategy.ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, capturedArgument.getValue()); + } + @Test public void testSuccessfulInitialMinWorkersProvision() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -137,10 +164,105 @@ public class PendingTaskBasedProvisioningStrategyTest } } + @Test + public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityHint(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + new ArrayList<>() + ); + EasyMock.expect(runner.getWorkers()).andReturn( + Collections.emptyList() + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Collections.singletonList("aNode")) + ).times(3); + EasyMock.replay(runner, autoScaler); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(provisioner.getStats().toList().size() == 3); + for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { + Assert.assertTrue( + event.getEvent() == ScalingStats.EVENT.PROVISION + ); + } + } + + @Test + public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityHint(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + // minWorkerCount is 0 + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + new ArrayList<>() + ); + EasyMock.expect(runner.getWorkers()).andReturn( + Collections.emptyList() + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.replay(runner, autoScaler); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + Assert.assertFalse(provisionedSomething); + Assert.assertEquals(0, provisioner.getStats().toList().size()); + } + @Test public void testSuccessfulMinWorkersProvision() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -174,7 +296,7 @@ public class PendingTaskBasedProvisioningStrategyTest @Test public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -207,9 +329,9 @@ public class PendingTaskBasedProvisioningStrategyTest } @Test - public void testSomethingProvisioning() + public void testProvisioning() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()).times(2); @@ -257,6 +379,153 @@ public class PendingTaskBasedProvisioningStrategyTest EasyMock.verify(runner); } + @Test + public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityHint(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()).times(2); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Collections.singletonList("fake")) + ).times(2); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // two pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + ImmutableList.of( + NoopTask.create(), + NoopTask.create() + ) + ).times(2); + // Capacity for current worker is 1 + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable(), + new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node + ) + ).times(2); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + + // Expect to use capacity from current worker (which is 1) + // and since there are two pending tasks, we will need two more workers + Assert.assertTrue(provisionedSomething); + Assert.assertEquals(2, provisioner.getStats().toList().size()); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent()); + Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent()); + + provisionedSomething = provisioner.doProvision(); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + @Test + public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromHintConfig() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityHint(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()).times(2); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Collections.singletonList("fake")) + ).times(1); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // two pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + ImmutableList.of( + NoopTask.create(), + NoopTask.create() + ) + ).times(2); + // No currently running worker node + EasyMock.expect(runner.getWorkers()).andReturn( + Collections.emptyList() + ).times(2); + + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + + // Expect to use capacity from workerCapacityHint config (which is 30) + // and since there are two pending tasks, we will need one more worker + Assert.assertTrue(provisionedSomething); + Assert.assertEquals(1, provisioner.getStats().toList().size()); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent()); + + provisionedSomething = provisioner.doProvision(); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + @Test public void testProvisionAlert() throws Exception { @@ -266,7 +535,7 @@ public class PendingTaskBasedProvisioningStrategyTest EasyMock.expectLastCall(); EasyMock.replay(emitter); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()).times(2); @@ -323,7 +592,7 @@ public class PendingTaskBasedProvisioningStrategyTest @Test public void testDoSuccessfulTerminate() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn( @@ -343,7 +612,8 @@ public class PendingTaskBasedProvisioningStrategyTest ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( + ImmutableList.of( + new TestZkWorker(testTask).toImmutable(), new TestZkWorker(testTask).toImmutable() ) ).times(2); @@ -367,7 +637,7 @@ public class PendingTaskBasedProvisioningStrategyTest @Test public void testSomethingTerminating() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")).times(2); EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn( @@ -377,7 +647,9 @@ public class PendingTaskBasedProvisioningStrategyTest RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( + ImmutableList.of( + new TestZkWorker(testTask).toImmutable(), + new TestZkWorker(testTask).toImmutable(), new TestZkWorker(testTask).toImmutable() ) ).times(2); @@ -411,7 +683,7 @@ public class PendingTaskBasedProvisioningStrategyTest public void testNoActionNeeded() { EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); EasyMock.replay(autoScaler); @@ -442,7 +714,7 @@ public class PendingTaskBasedProvisioningStrategyTest EasyMock.verify(autoScaler); EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); @@ -460,7 +732,7 @@ public class PendingTaskBasedProvisioningStrategyTest { // Don't terminate anything EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); EasyMock.replay(autoScaler); @@ -487,7 +759,7 @@ public class PendingTaskBasedProvisioningStrategyTest // Don't provision anything EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); @@ -498,7 +770,7 @@ public class PendingTaskBasedProvisioningStrategyTest EasyMock.reset(autoScaler); // Increase minNumWorkers - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip"));