mirror of https://github.com/apache/druid.git
Improve Auto scaler pendingTaskBased provisioning strategy to handle when there are no currently running worker node better (#11440)
* fix pendingTaskBased * fix doc * address comments * address comments * address comments * address comments * address comments * address comments * address comments
This commit is contained in:
parent
d3e82b1114
commit
8d7d60d18e
|
@ -1015,6 +1015,7 @@ There are additional configs for autoscaling (if it is enabled):
|
|||
|`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S|
|
||||
|`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null|
|
||||
|`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080|
|
||||
|`druid.indexer.autoscale.workerCapacityHint`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. This value should typically be equal to `druid.worker.capacity` when you have a homogeneous cluster and the average of `druid.worker.capacity` across the workers when you have a heterogeneous cluster. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1|
|
||||
|
||||
##### Supervisors
|
||||
|
||||
|
|
|
@ -80,8 +80,8 @@ public class GceAutoScaler implements AutoScaler<GceEnvironmentConfig>
|
|||
@JsonProperty("envConfig") GceEnvironmentConfig envConfig
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(minNumWorkers > 0,
|
||||
"minNumWorkers must be greater than 0");
|
||||
Preconditions.checkArgument(minNumWorkers >= 0,
|
||||
"minNumWorkers must be greater than or equal to 0");
|
||||
this.minNumWorkers = minNumWorkers;
|
||||
Preconditions.checkArgument(maxNumWorkers > 0,
|
||||
"maxNumWorkers must be greater than 0");
|
||||
|
|
|
@ -29,6 +29,8 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis
|
|||
@JsonProperty
|
||||
private int maxScalingStep = 10;
|
||||
|
||||
@JsonProperty
|
||||
private int workerCapacityHint = -1;
|
||||
|
||||
public int getMaxScalingStep()
|
||||
{
|
||||
|
@ -76,4 +78,14 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis
|
|||
return this;
|
||||
}
|
||||
|
||||
public int getWorkerCapacityHint()
|
||||
{
|
||||
return workerCapacityHint;
|
||||
}
|
||||
|
||||
public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityHint(int workerCapacityHint)
|
||||
{
|
||||
this.workerCapacityHint = workerCapacityHint;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord.autoscaling;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
|
@ -60,11 +61,14 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class);
|
||||
|
||||
public static final String ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET = "As minNumWorkers is set to 0, workerCapacityHint must be greater than 0. workerCapacityHint value set is %d";
|
||||
private static final String SCHEME = "http";
|
||||
|
||||
@VisibleForTesting
|
||||
@Nullable
|
||||
static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
|
||||
public static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
|
||||
Supplier<WorkerBehaviorConfig> workerConfigRef,
|
||||
SimpleWorkerProvisioningConfig config,
|
||||
String action,
|
||||
EmittingLogger log
|
||||
)
|
||||
|
@ -87,6 +91,13 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
log.error("No autoScaler available, cannot %s workers", action);
|
||||
return null;
|
||||
}
|
||||
if (config instanceof PendingTaskBasedWorkerProvisioningConfig
|
||||
&& workerConfig.getAutoScaler().getMinNumWorkers() == 0
|
||||
&& ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint() <= 0
|
||||
) {
|
||||
log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint());
|
||||
return null;
|
||||
}
|
||||
return workerConfig;
|
||||
}
|
||||
|
||||
|
@ -157,7 +168,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
||||
log.debug("Workers: %d %s", workers.size(), workers);
|
||||
boolean didProvision = false;
|
||||
final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
|
||||
final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log);
|
||||
if (workerConfig == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -246,13 +257,18 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
|
||||
final int currValidWorkers = getCurrValidWorkers(workers);
|
||||
|
||||
// If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
|
||||
// since we are not aware of the expectedWorkerCapacity.
|
||||
int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
|
||||
// If there are no worker and workerCapacityHint config is not set (-1) or invalid (<= 0), then spin up minWorkerCount
|
||||
// as we cannot determine the exact capacity here to fulfill the need.
|
||||
// However, if there are no worker but workerCapacityHint config is set (>0), then we can
|
||||
// determine the number of workers needed using workerCapacityHint config as expected worker capacity
|
||||
int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityHint() <= 0
|
||||
? minWorkerCount
|
||||
: getWorkersNeededToAssignTasks(
|
||||
remoteTaskRunnerConfig,
|
||||
workerConfig,
|
||||
pendingTasks,
|
||||
workers
|
||||
workers,
|
||||
config.getWorkerCapacityHint()
|
||||
);
|
||||
log.debug("More workers needed: %d", moreWorkersNeeded);
|
||||
|
||||
|
@ -280,7 +296,8 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
|
||||
final DefaultWorkerBehaviorConfig workerConfig,
|
||||
final Collection<Task> pendingTasks,
|
||||
final Collection<ImmutableWorkerInfo> workers
|
||||
final Collection<ImmutableWorkerInfo> workers,
|
||||
final int workerCapacityHint
|
||||
)
|
||||
{
|
||||
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
|
||||
|
@ -295,7 +312,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
}
|
||||
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
|
||||
int need = 0;
|
||||
int capacity = getExpectedWorkerCapacity(workers);
|
||||
int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint);
|
||||
log.info("Expected worker capacity: %d", capacity);
|
||||
|
||||
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
|
||||
|
@ -333,7 +350,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
{
|
||||
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
|
||||
log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers);
|
||||
final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
|
||||
final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log);
|
||||
if (workerConfig == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -441,12 +458,18 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
|
|||
return currValidWorkers;
|
||||
}
|
||||
|
||||
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
|
||||
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers, final int workerCapacityHint)
|
||||
{
|
||||
int size = workers.size();
|
||||
if (size == 0) {
|
||||
// No existing workers assume capacity per worker as 1
|
||||
// No existing workers
|
||||
if (workerCapacityHint > 0) {
|
||||
// Return workerCapacityHint if it is set in config
|
||||
return workerCapacityHint;
|
||||
} else {
|
||||
// Assume capacity per worker as 1
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
// Assume all workers have same capacity
|
||||
return workers.iterator().next().getWorker().getCapacity();
|
||||
|
|
|
@ -121,7 +121,7 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning
|
|||
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
|
||||
boolean didProvision = false;
|
||||
final DefaultWorkerBehaviorConfig workerConfig =
|
||||
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
|
||||
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log);
|
||||
if (workerConfig == null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ public class SimpleWorkerProvisioningStrategy extends AbstractWorkerProvisioning
|
|||
{
|
||||
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
|
||||
final DefaultWorkerBehaviorConfig workerConfig =
|
||||
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
|
||||
PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log);
|
||||
if (workerConfig == null) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.indexing.overlord.autoscaling;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.common.guava.DSuppliers;
|
||||
import org.apache.druid.indexer.TaskLocation;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
|
|||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Period;
|
||||
|
@ -106,10 +108,35 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet()
|
||||
{
|
||||
EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class);
|
||||
Capture<String> capturedArgument = Capture.newInstance();
|
||||
mockLogger.error(EasyMock.capture(capturedArgument), EasyMock.anyInt());
|
||||
|
||||
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
|
||||
.setMaxScalingDuration(new Period(1000))
|
||||
.setNumEventsToTrack(10)
|
||||
.setPendingTaskTimeout(new Period(0))
|
||||
.setWorkerVersion(MIN_VERSION)
|
||||
.setMaxScalingStep(2);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
|
||||
EasyMock.replay(autoScaler, mockLogger);
|
||||
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(
|
||||
DSuppliers.of(workerConfig),
|
||||
config,
|
||||
"test",
|
||||
mockLogger
|
||||
);
|
||||
Assert.assertNull(defaultWorkerBehaviorConfig);
|
||||
Assert.assertEquals(PendingTaskBasedWorkerProvisioningStrategy.ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, capturedArgument.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulInitialMinWorkersProvision()
|
||||
{
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>());
|
||||
|
@ -137,10 +164,105 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum()
|
||||
{
|
||||
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
|
||||
.setMaxScalingDuration(new Period(1000))
|
||||
.setNumEventsToTrack(10)
|
||||
.setPendingTaskTimeout(new Period(0))
|
||||
.setWorkerVersion(MIN_VERSION)
|
||||
.setMaxScalingStep(2)
|
||||
.setWorkerCapacityHint(30);
|
||||
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
|
||||
config,
|
||||
DSuppliers.of(workerConfig),
|
||||
new ProvisioningSchedulerConfig(),
|
||||
new Supplier<ScheduledExecutorService>()
|
||||
{
|
||||
@Override
|
||||
public ScheduledExecutorService get()
|
||||
{
|
||||
return executorService;
|
||||
}
|
||||
}
|
||||
);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>());
|
||||
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
|
||||
// No pending tasks
|
||||
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
|
||||
new ArrayList<>()
|
||||
);
|
||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||
Collections.emptyList()
|
||||
);
|
||||
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
|
||||
EasyMock.expect(autoScaler.provision()).andReturn(
|
||||
new AutoScalingData(Collections.singletonList("aNode"))
|
||||
).times(3);
|
||||
EasyMock.replay(runner, autoScaler);
|
||||
Provisioner provisioner = strategy.makeProvisioner(runner);
|
||||
boolean provisionedSomething = provisioner.doProvision();
|
||||
Assert.assertTrue(provisionedSomething);
|
||||
Assert.assertTrue(provisioner.getStats().toList().size() == 3);
|
||||
for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
|
||||
Assert.assertTrue(
|
||||
event.getEvent() == ScalingStats.EVENT.PROVISION
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero()
|
||||
{
|
||||
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
|
||||
.setMaxScalingDuration(new Period(1000))
|
||||
.setNumEventsToTrack(10)
|
||||
.setPendingTaskTimeout(new Period(0))
|
||||
.setWorkerVersion(MIN_VERSION)
|
||||
.setMaxScalingStep(2)
|
||||
.setWorkerCapacityHint(30);
|
||||
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
|
||||
config,
|
||||
DSuppliers.of(workerConfig),
|
||||
new ProvisioningSchedulerConfig(),
|
||||
new Supplier<ScheduledExecutorService>()
|
||||
{
|
||||
@Override
|
||||
public ScheduledExecutorService get()
|
||||
{
|
||||
return executorService;
|
||||
}
|
||||
}
|
||||
);
|
||||
// minWorkerCount is 0
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>());
|
||||
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
|
||||
// No pending tasks
|
||||
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
|
||||
new ArrayList<>()
|
||||
);
|
||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||
Collections.emptyList()
|
||||
);
|
||||
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
|
||||
EasyMock.replay(runner, autoScaler);
|
||||
Provisioner provisioner = strategy.makeProvisioner(runner);
|
||||
boolean provisionedSomething = provisioner.doProvision();
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
Assert.assertEquals(0, provisioner.getStats().toList().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulMinWorkersProvision()
|
||||
{
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>());
|
||||
|
@ -174,7 +296,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
@Test
|
||||
public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning()
|
||||
{
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>());
|
||||
|
@ -207,9 +329,9 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSomethingProvisioning()
|
||||
public void testProvisioning()
|
||||
{
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>()).times(2);
|
||||
|
@ -257,6 +379,153 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
EasyMock.verify(runner);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker()
|
||||
{
|
||||
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
|
||||
.setMaxScalingDuration(new Period(1000))
|
||||
.setNumEventsToTrack(10)
|
||||
.setPendingTaskTimeout(new Period(0))
|
||||
.setWorkerVersion(MIN_VERSION)
|
||||
.setMaxScalingStep(2)
|
||||
.setWorkerCapacityHint(30);
|
||||
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
|
||||
config,
|
||||
DSuppliers.of(workerConfig),
|
||||
new ProvisioningSchedulerConfig(),
|
||||
new Supplier<ScheduledExecutorService>()
|
||||
{
|
||||
@Override
|
||||
public ScheduledExecutorService get()
|
||||
{
|
||||
return executorService;
|
||||
}
|
||||
}
|
||||
);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>()).times(2);
|
||||
EasyMock.expect(autoScaler.provision()).andReturn(
|
||||
new AutoScalingData(Collections.singletonList("fake"))
|
||||
).times(2);
|
||||
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
|
||||
// two pending tasks
|
||||
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
|
||||
ImmutableList.of(
|
||||
NoopTask.create(),
|
||||
NoopTask.create()
|
||||
)
|
||||
).times(2);
|
||||
// Capacity for current worker is 1
|
||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||
Arrays.asList(
|
||||
new TestZkWorker(testTask).toImmutable(),
|
||||
new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node
|
||||
)
|
||||
).times(2);
|
||||
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
|
||||
EasyMock.replay(runner);
|
||||
EasyMock.replay(autoScaler);
|
||||
|
||||
Provisioner provisioner = strategy.makeProvisioner(runner);
|
||||
boolean provisionedSomething = provisioner.doProvision();
|
||||
|
||||
// Expect to use capacity from current worker (which is 1)
|
||||
// and since there are two pending tasks, we will need two more workers
|
||||
Assert.assertTrue(provisionedSomething);
|
||||
Assert.assertEquals(2, provisioner.getStats().toList().size());
|
||||
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
|
||||
Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
|
||||
Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent());
|
||||
|
||||
provisionedSomething = provisioner.doProvision();
|
||||
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
Assert.assertTrue(
|
||||
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
|
||||
);
|
||||
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
|
||||
Assert.assertTrue(
|
||||
createdTime.equals(anotherCreatedTime)
|
||||
);
|
||||
|
||||
EasyMock.verify(autoScaler);
|
||||
EasyMock.verify(runner);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromHintConfig()
|
||||
{
|
||||
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
|
||||
.setMaxScalingDuration(new Period(1000))
|
||||
.setNumEventsToTrack(10)
|
||||
.setPendingTaskTimeout(new Period(0))
|
||||
.setWorkerVersion(MIN_VERSION)
|
||||
.setMaxScalingStep(2)
|
||||
.setWorkerCapacityHint(30);
|
||||
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
|
||||
config,
|
||||
DSuppliers.of(workerConfig),
|
||||
new ProvisioningSchedulerConfig(),
|
||||
new Supplier<ScheduledExecutorService>()
|
||||
{
|
||||
@Override
|
||||
public ScheduledExecutorService get()
|
||||
{
|
||||
return executorService;
|
||||
}
|
||||
}
|
||||
);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>()).times(2);
|
||||
EasyMock.expect(autoScaler.provision()).andReturn(
|
||||
new AutoScalingData(Collections.singletonList("fake"))
|
||||
).times(1);
|
||||
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
|
||||
// two pending tasks
|
||||
EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
|
||||
ImmutableList.of(
|
||||
NoopTask.create(),
|
||||
NoopTask.create()
|
||||
)
|
||||
).times(2);
|
||||
// No currently running worker node
|
||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||
Collections.emptyList()
|
||||
).times(2);
|
||||
|
||||
EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
|
||||
EasyMock.replay(runner);
|
||||
EasyMock.replay(autoScaler);
|
||||
|
||||
Provisioner provisioner = strategy.makeProvisioner(runner);
|
||||
boolean provisionedSomething = provisioner.doProvision();
|
||||
|
||||
// Expect to use capacity from workerCapacityHint config (which is 30)
|
||||
// and since there are two pending tasks, we will need one more worker
|
||||
Assert.assertTrue(provisionedSomething);
|
||||
Assert.assertEquals(1, provisioner.getStats().toList().size());
|
||||
DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
|
||||
Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
|
||||
|
||||
provisionedSomething = provisioner.doProvision();
|
||||
|
||||
Assert.assertFalse(provisionedSomething);
|
||||
Assert.assertTrue(
|
||||
provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
|
||||
);
|
||||
DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
|
||||
Assert.assertTrue(
|
||||
createdTime.equals(anotherCreatedTime)
|
||||
);
|
||||
|
||||
EasyMock.verify(autoScaler);
|
||||
EasyMock.verify(runner);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvisionAlert() throws Exception
|
||||
{
|
||||
|
@ -266,7 +535,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
EasyMock.expectLastCall();
|
||||
EasyMock.replay(emitter);
|
||||
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>()).times(2);
|
||||
|
@ -323,7 +592,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
@Test
|
||||
public void testDoSuccessfulTerminate()
|
||||
{
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(new ArrayList<String>());
|
||||
EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn(
|
||||
|
@ -343,7 +612,8 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
)
|
||||
).times(2);
|
||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||
Collections.singletonList(
|
||||
ImmutableList.of(
|
||||
new TestZkWorker(testTask).toImmutable(),
|
||||
new TestZkWorker(testTask).toImmutable()
|
||||
)
|
||||
).times(2);
|
||||
|
@ -367,7 +637,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
@Test
|
||||
public void testSomethingTerminating()
|
||||
{
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(Collections.singletonList("ip")).times(2);
|
||||
EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn(
|
||||
|
@ -377,7 +647,9 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
|
||||
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
|
||||
EasyMock.expect(runner.getWorkers()).andReturn(
|
||||
Collections.singletonList(
|
||||
ImmutableList.of(
|
||||
new TestZkWorker(testTask).toImmutable(),
|
||||
new TestZkWorker(testTask).toImmutable(),
|
||||
new TestZkWorker(testTask).toImmutable()
|
||||
)
|
||||
).times(2);
|
||||
|
@ -411,7 +683,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
public void testNoActionNeeded()
|
||||
{
|
||||
EasyMock.reset(autoScaler);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(Collections.singletonList("ip"));
|
||||
EasyMock.replay(autoScaler);
|
||||
|
@ -442,7 +714,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
EasyMock.verify(autoScaler);
|
||||
|
||||
EasyMock.reset(autoScaler);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(Collections.singletonList("ip"));
|
||||
|
@ -460,7 +732,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
{
|
||||
// Don't terminate anything
|
||||
EasyMock.reset(autoScaler);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(Collections.singletonList("ip"));
|
||||
EasyMock.replay(autoScaler);
|
||||
|
@ -487,7 +759,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
|
||||
// Don't provision anything
|
||||
EasyMock.reset(autoScaler);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(Collections.singletonList("ip"));
|
||||
|
@ -498,7 +770,7 @@ public class PendingTaskBasedProvisioningStrategyTest
|
|||
|
||||
EasyMock.reset(autoScaler);
|
||||
// Increase minNumWorkers
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
|
||||
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
|
||||
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
|
||||
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
|
||||
.andReturn(Collections.singletonList("ip"));
|
||||
|
|
Loading…
Reference in New Issue