diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index 2c45a0ec7b8..767afaa2fd3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -268,6 +268,12 @@ public class KubernetesAndWorkerTaskRunner implements TaskLogStreamer, WorkerTas return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity); } + @Override + public int getMaximumCapacityWithAutoscale() + { + return workerTaskRunner.getMaximumCapacityWithAutoscale() + kubernetesTaskRunner.getMaximumCapacityWithAutoscale(); + } + @Override public int getUsedCapacity() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index 3ab515cc6e5..1cc3be34e38 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -371,4 +371,14 @@ public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport runner.updateLocation(task, TaskLocation.unknown()); verifyAll(); } + + @Test + public void test_getMaximumCapacity() + { + EasyMock.expect(kubernetesTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1); + EasyMock.expect(workerTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1); + replayAll(); + Assert.assertEquals(2, runner.getMaximumCapacityWithAutoscale()); + verifyAll(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 8ef718e97be..1d06321ddc4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy; import org.apache.druid.indexing.worker.TaskAnnouncement; @@ -1648,6 +1649,35 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + /** + * Retrieves the maximum capacity of the task runner when autoscaling is enabled.* + * @return The maximum capacity as an integer value. Returns -1 if the maximum + * capacity cannot be determined or if autoscaling is not enabled. + */ + @Override + public int getMaximumCapacityWithAutoscale() + { + int maximumCapacity = -1; + WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers()); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } + return maximumCapacity; + } + @Override public int getUsedCapacity() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 4c37af7ef16..3613b6fa08d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -33,7 +32,6 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.http.TaskStateLookup; import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse; -import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -49,7 +47,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -377,40 +374,11 @@ public class TaskQueryTool } TaskRunner taskRunner = taskRunnerOptional.get(); - Collection workers = taskRunner instanceof WorkerTaskRunner ? - ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); - - int currentCapacity = taskRunner.getTotalCapacity(); - int usedCapacity = taskRunner.getUsedCapacity(); - // Calculate maximum capacity with auto scale - int maximumCapacity; - WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig(); - if (workerBehaviorConfig == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); - maximumCapacity = -1; - } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { - DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; - if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); - maximumCapacity = -1; - } else { - int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); - int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); - maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; - } - } else { - // Auto-scale is not using DefaultWorkerBehaviorConfig - log.debug( - "Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", - workerBehaviorConfig, - workerBehaviorConfig.getClass().getSimpleName() - ); - maximumCapacity = -1; - } - - return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity); + return new TotalWorkerCapacityResponse( + taskRunner.getTotalCapacity(), + taskRunner.getMaximumCapacityWithAutoscale(), + taskRunner.getUsedCapacity() + ); } public WorkerBehaviorConfig getLatestWorkerConfig() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index ac1fd124ef5..2178bc433df 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -155,6 +155,15 @@ public interface TaskRunner return -1; } + /** + * The maximum number of tasks this TaskRunner can run concurrently with autoscaling hints. + * @return -1 if this method is not implemented or capacity can't be found. + */ + default int getMaximumCapacityWithAutoscale() + { + return -1; + } + /** * The current number of tasks this TaskRunner is running. * Can return -1 if this method is not implemented or the # of tasks can't be found. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index a2edb1eb6d1..72bd9cff174 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -64,6 +64,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy; import org.apache.druid.indexing.worker.TaskAnnouncement; @@ -1791,6 +1792,36 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + + /** + * Retrieves the maximum capacity of the task runner when autoscaling is enabled.* + * @return The maximum capacity as an integer value. Returns -1 if the maximum + * capacity cannot be determined or if autoscaling is not enabled. + */ + @Override + public int getMaximumCapacityWithAutoscale() + { + int maximumCapacity = -1; + WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers()); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } + return maximumCapacity; + } + @Override public int getUsedCapacity() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index c1a77c1f4b4..2ab63cd79ee 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -47,6 +47,8 @@ import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceActio import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; @@ -153,6 +155,7 @@ public class RemoteTaskRunnerTest Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity()); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale()); Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity()); @@ -608,6 +611,46 @@ public class RemoteTaskRunnerTest ); } + @Test + public void testGetMaximumCapacity_noWorkerConfig() + { + httpClient = EasyMock.createMock(HttpClient.class); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD), + new TestProvisioningStrategy<>(), + httpClient, + null + ); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale()); + } + + @Test + public void testGetMaximumCapacity_noAutoScaler() + { + httpClient = EasyMock.createMock(HttpClient.class); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD), + new TestProvisioningStrategy<>(), + httpClient, + new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null) + ); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale()); + } + + @Test + public void testGetMaximumCapacity_withAutoScaler() + { + httpClient = EasyMock.createMock(HttpClient.class); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD), + new TestProvisioningStrategy<>(), + httpClient, + DefaultWorkerBehaviorConfig.defaultConfig() + ); + // Default autoscaler has max workers of 0 + Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacityWithAutoscale()); + } + private void doSetup() throws Exception { makeWorker(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index bdf886aa41b..7c579f19676 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -117,6 +117,21 @@ public class RemoteTaskRunnerTestUtils ProvisioningStrategy provisioningStrategy, HttpClient httpClient ) + { + return makeRemoteTaskRunner( + config, + provisioningStrategy, + httpClient, + DefaultWorkerBehaviorConfig.defaultConfig() + ); + } + + public RemoteTaskRunner makeRemoteTaskRunner( + RemoteTaskRunnerConfig config, + ProvisioningStrategy provisioningStrategy, + HttpClient httpClient, + WorkerBehaviorConfig workerBehaviorConfig + ) { RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner( jsonMapper, @@ -134,7 +149,7 @@ public class RemoteTaskRunnerTestUtils cf, new PathChildrenCacheFactory.Builder(), httpClient, - DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + DSuppliers.of(new AtomicReference<>(workerBehaviorConfig)), provisioningStrategy ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java new file mode 100644 index 00000000000..18cade99b9d --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; + +import javax.annotation.Nonnull; +import java.util.Collection; + + +public class TestProvisioningStrategy implements ProvisioningStrategy +{ + @Override + public ProvisioningService makeProvisioningService(T runner) + { + return new ProvisioningService() + { + @Override + public void close() + { + // nothing to close + } + + @Override + public ScalingStats getStats() + { + return null; + } + }; + } + + @Override + public int getExpectedWorkerCapacity(@Nonnull Collection workers) + { + return 1; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 1bfac9f42a3..91b0778c950 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -42,11 +42,13 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.TestProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; @@ -147,6 +149,7 @@ public class HttpRemoteTaskRunnerTest Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size()); Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size()); Assert.assertEquals(4, taskRunner.getTotalCapacity()); + Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); Assert.assertEquals(0, taskRunner.getUsedCapacity()); } @@ -1778,6 +1781,79 @@ public class HttpRemoteTaskRunnerTest Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size()); } + @Test + public void testGetMaximumCapacity_noWorkerConfig() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(null)), + new TestProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ); + Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); + } + + @Test + public void testGetMaximumCapacity_noAutoScaler() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null))), + new TestProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ); + Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); + } + + @Test + public void testGetMaximumCapacity_withAutoScaler() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + new TestProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ); + // Default autoscaler has max workers of 0 + Assert.assertEquals(0, taskRunner.getMaximumCapacityWithAutoscale()); + } + public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( TaskStorage taskStorage, List listenerNotificationsAccumulator diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index c13a1571765..732d586002f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DruidOverlord; -import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; @@ -52,14 +51,9 @@ import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; -import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; -import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import org.apache.druid.indexing.worker.Worker; -import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; @@ -1321,6 +1315,7 @@ public class OverlordResourceTest .andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(-1); EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); @@ -1332,130 +1327,26 @@ public class OverlordResourceTest } @Test - public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured() - { - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(null); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); - EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.expect(overlord.isLeader()).andReturn(true); - replayAll(); - - final Response response = overlordResource.getTotalWorkerCapacity(); - Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - } - - @Test - public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured() - { - DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); - EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.expect(overlord.isLeader()).andReturn(true); - replayAll(); - - final Response response = overlordResource.getTotalWorkerCapacity(); - Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - } - - @Test - public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity() + public void testGetTotalWorkerCapacityWithMaximumCapacity() { int expectedWorkerCapacity = 3; - int maxNumWorkers = 2; - WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); - Collection workerInfos = ImmutableList.of( - new ImmutableWorkerInfo( - new Worker( - "http", "testWorker", "192.0.0.1", expectedWorkerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY - ), - 2, - ImmutableSet.of("grp1", "grp2"), - ImmutableSet.of("task1", "task2"), - DateTimes.of("2015-01-01T01:01:01Z") - ) - ); - EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); - EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity); - EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0); - - EasyMock.reset(taskMaster); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(workerTaskRunner) - ).anyTimes(); - EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes(); - AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); - EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); - DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.replay( - workerTaskRunner, - autoScaler - ); + int expectedWorkerCapacityWithAutoscale = 10; + WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); + AtomicReference workerBehaviorConfigAtomicReference + = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)) + .andReturn(workerBehaviorConfigAtomicReference); + EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity); + EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(expectedWorkerCapacity); + EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(expectedWorkerCapacityWithAutoscale); EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); + final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(0, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); - Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - } - - @Test - public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity() - { - int invalidExpectedCapacity = -1; - int currentTotalCapacity = 3; - int currentCapacityUsed = 2; - int maxNumWorkers = 2; - WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); - Collection workerInfos = ImmutableList.of( - new ImmutableWorkerInfo( - new Worker( - "http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY - ), - currentCapacityUsed, - ImmutableSet.of("grp1", "grp2"), - ImmutableSet.of("task1", "task2"), - DateTimes.of("2015-01-01T01:01:01Z") - ) - ); - EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); - EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity); - EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed); - EasyMock.reset(taskMaster); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(workerTaskRunner) - ).anyTimes(); - EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes(); - AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); - EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); - DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.replay( - workerTaskRunner, - autoScaler - ); - EasyMock.expect(overlord.isLeader()).andReturn(true); - replayAll(); - final Response response = overlordResource.getTotalWorkerCapacity(); - Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); + Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); + Assert.assertEquals(expectedWorkerCapacityWithAutoscale, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } @Test