Add maximumCapacity to taskRunner (#17107)

* Add maximumCapacity to taskRunner

* fix tests

* pr comments
This commit is contained in:
George Shiqi Wu 2024-10-07 15:03:51 -04:00 committed by GitHub
parent ff97c67945
commit 5d7c7a87ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 297 additions and 161 deletions

View File

@ -268,6 +268,12 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}
@Override
public int getMaximumCapacityWithAutoscale()
{
return workerTaskRunner.getMaximumCapacityWithAutoscale() + kubernetesTaskRunner.getMaximumCapacityWithAutoscale();
}
@Override
public int getUsedCapacity()
{

View File

@ -371,4 +371,14 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
runner.updateLocation(task, TaskLocation.unknown());
verifyAll();
}
@Test
public void test_getMaximumCapacity()
{
EasyMock.expect(kubernetesTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
EasyMock.expect(workerTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
replayAll();
Assert.assertEquals(2, runner.getMaximumCapacityWithAutoscale());
verifyAll();
}
}

View File

@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
@ -1648,6 +1649,35 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}
/**
* Retrieves the maximum capacity of the task runner when autoscaling is enabled.*
* @return The maximum capacity as an integer value. Returns -1 if the maximum
* capacity cannot be determined or if autoscaling is not enabled.
*/
@Override
public int getMaximumCapacityWithAutoscale()
{
int maximumCapacity = -1;
WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
}
return maximumCapacity;
}
@Override
public int getUsedCapacity()
{

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@ -33,7 +32,6 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@ -49,7 +47,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -377,40 +374,11 @@ public class TaskQueryTool
}
TaskRunner taskRunner = taskRunnerOptional.get();
Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ?
((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of();
int currentCapacity = taskRunner.getTotalCapacity();
int usedCapacity = taskRunner.getUsedCapacity();
// Calculate maximum capacity with auto scale
int maximumCapacity;
WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers);
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
} else {
// Auto-scale is not using DefaultWorkerBehaviorConfig
log.debug(
"Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
workerBehaviorConfig,
workerBehaviorConfig.getClass().getSimpleName()
);
maximumCapacity = -1;
}
return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity);
return new TotalWorkerCapacityResponse(
taskRunner.getTotalCapacity(),
taskRunner.getMaximumCapacityWithAutoscale(),
taskRunner.getUsedCapacity()
);
}
public WorkerBehaviorConfig getLatestWorkerConfig()

View File

@ -155,6 +155,15 @@ public interface TaskRunner
return -1;
}
/**
* The maximum number of tasks this TaskRunner can run concurrently with autoscaling hints.
* @return -1 if this method is not implemented or capacity can't be found.
*/
default int getMaximumCapacityWithAutoscale()
{
return -1;
}
/**
* The current number of tasks this TaskRunner is running.
* Can return -1 if this method is not implemented or the # of tasks can't be found.

View File

@ -64,6 +64,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
@ -1791,6 +1792,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}
/**
* Retrieves the maximum capacity of the task runner when autoscaling is enabled.*
* @return The maximum capacity as an integer value. Returns -1 if the maximum
* capacity cannot be determined or if autoscaling is not enabled.
*/
@Override
public int getMaximumCapacityWithAutoscale()
{
int maximumCapacity = -1;
WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
}
return maximumCapacity;
}
@Override
public int getUsedCapacity()
{

View File

@ -47,6 +47,8 @@ import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceActio
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
@ -153,6 +155,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());
@ -608,6 +611,46 @@ public class RemoteTaskRunnerTest
);
}
@Test
public void testGetMaximumCapacity_noWorkerConfig()
{
httpClient = EasyMock.createMock(HttpClient.class);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
null
);
Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale());
}
@Test
public void testGetMaximumCapacity_noAutoScaler()
{
httpClient = EasyMock.createMock(HttpClient.class);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null)
);
Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale());
}
@Test
public void testGetMaximumCapacity_withAutoScaler()
{
httpClient = EasyMock.createMock(HttpClient.class);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
DefaultWorkerBehaviorConfig.defaultConfig()
);
// Default autoscaler has max workers of 0
Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacityWithAutoscale());
}
private void doSetup() throws Exception
{
makeWorker();

View File

@ -117,6 +117,21 @@ public class RemoteTaskRunnerTestUtils
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
HttpClient httpClient
)
{
return makeRemoteTaskRunner(
config,
provisioningStrategy,
httpClient,
DefaultWorkerBehaviorConfig.defaultConfig()
);
}
public RemoteTaskRunner makeRemoteTaskRunner(
RemoteTaskRunnerConfig config,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
HttpClient httpClient,
WorkerBehaviorConfig workerBehaviorConfig
)
{
RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner(
jsonMapper,
@ -134,7 +149,7 @@ public class RemoteTaskRunnerTestUtils
cf,
new PathChildrenCacheFactory.Builder(),
httpClient,
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
DSuppliers.of(new AtomicReference<>(workerBehaviorConfig)),
provisioningStrategy
);

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import javax.annotation.Nonnull;
import java.util.Collection;
public class TestProvisioningStrategy<T extends TaskRunner> implements ProvisioningStrategy<T>
{
@Override
public ProvisioningService makeProvisioningService(T runner)
{
return new ProvisioningService()
{
@Override
public void close()
{
// nothing to close
}
@Override
public ScalingStats getStats()
{
return null;
}
};
}
@Override
public int getExpectedWorkerCapacity(@Nonnull Collection<ImmutableWorkerInfo> workers)
{
return 1;
}
}

View File

@ -42,11 +42,13 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TestProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
@ -147,6 +149,7 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
Assert.assertEquals(4, taskRunner.getTotalCapacity());
Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}
@ -1778,6 +1781,79 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
}
@Test
public void testGetMaximumCapacity_noWorkerConfig()
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(null)),
new TestProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
);
Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
}
@Test
public void testGetMaximumCapacity_noAutoScaler()
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null))),
new TestProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
);
Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
}
@Test
public void testGetMaximumCapacity_withAutoScaler()
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
new TestProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
);
// Default autoscaler has max workers of 0
Assert.assertEquals(0, taskRunner.getMaximumCapacityWithAutoscale());
}
public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator

View File

@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DruidOverlord;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
@ -52,14 +51,9 @@ import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
@ -1321,6 +1315,7 @@ public class OverlordResourceTest
.andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
@ -1332,130 +1327,26 @@ public class OverlordResourceTest
}
@Test
public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured()
{
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(null);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured()
{
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity()
public void testGetTotalWorkerCapacityWithMaximumCapacity()
{
int expectedWorkerCapacity = 3;
int maxNumWorkers = 2;
WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class);
Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", expectedWorkerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
).anyTimes();
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes();
AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
workerTaskRunner,
autoScaler
);
int expectedWorkerCapacityWithAutoscale = 10;
WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class))
.andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(expectedWorkerCapacity);
EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(expectedWorkerCapacityWithAutoscale);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(0, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
{
int invalidExpectedCapacity = -1;
int currentTotalCapacity = 3;
int currentCapacityUsed = 2;
int maxNumWorkers = 2;
WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class);
Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY
),
currentCapacityUsed,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
).anyTimes();
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
AutoScaler<?> autoScaler = EasyMock.createMock(AutoScaler.class);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
workerTaskRunner,
autoScaler
);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(expectedWorkerCapacityWithAutoscale, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test