From 2b8e7fc0b4ab76af836594d3b14e9213fa64cff0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sun, 6 Feb 2022 20:46:05 -0800 Subject: [PATCH] Add a flag to allow auto compaction task slot ratio to consider auto scaler slots (#12228) * add impl * fix checkstyle * add unit tests * checkstyle * add IT * fix IT * add comments * fix checkstyle --- ...ngTaskBasedWorkerProvisioningStrategy.java | 15 +- .../autoscaling/ProvisioningStrategy.java | 15 ++ .../overlord/http/OverlordResource.java | 71 ++++++- .../http/TotalWorkerCapacityResponse.java | 62 ++++++ ...dingTaskBasedProvisioningStrategyTest.java | 102 ++++++++- .../overlord/http/OverlordResourceTest.java | 196 +++++++++++++++++- .../indexing/overlord/http/OverlordTest.java | 3 +- .../clients/CompactionResourceTestClient.java | 19 +- .../ITAutoCompactionLockContentionTest.java | 2 +- .../duty/ITAutoCompactionTest.java | 31 ++- .../indexing/HttpIndexingServiceClient.java | 26 +++ .../indexing/IndexingServiceClient.java | 9 + .../IndexingTotalWorkerCapacityInfo.java | 62 ++++++ .../CoordinatorCompactionConfig.java | 64 +++--- .../coordinator/duty/CompactSegments.java | 15 +- .../CoordinatorCompactionConfigsResource.java | 4 +- .../HttpIndexingServiceClientTest.java | 31 +++ .../indexing/NoopIndexingServiceClient.java | 6 + .../coordinator/duty/CompactSegmentsTest.java | 55 ++++- ...rdinatorCompactionConfigsResourceTest.java | 4 + 20 files changed, 737 insertions(+), 55 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java create mode 100644 server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index 28d1bef3369..7d83b3d0e87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -267,8 +267,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr remoteTaskRunnerConfig, workerConfig, pendingTasks, - workers, - config.getWorkerCapacityHint() + workers ); log.debug("More workers needed: %d", moreWorkersNeeded); @@ -296,8 +295,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr final WorkerTaskRunnerConfig workerTaskRunnerConfig, final DefaultWorkerBehaviorConfig workerConfig, final Collection pendingTasks, - final Collection workers, - final int workerCapacityHint + final Collection workers ) { final Collection validWorkers = Collections2.filter( @@ -312,7 +310,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr } WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); int need = 0; - int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint); + int capacity = getExpectedWorkerCapacity(workers); log.info("Expected worker capacity: %d", capacity); // Simulate assigning tasks to dummy workers using configured workerSelectStrategy @@ -458,14 +456,15 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr return currValidWorkers; } - private static int getExpectedWorkerCapacity(final Collection workers, final int workerCapacityHint) + @Override + public int getExpectedWorkerCapacity(final Collection workers) { int size = workers.size(); if (size == 0) { // No existing workers - if (workerCapacityHint > 0) { + if (config.getWorkerCapacityHint() > 0) { // Return workerCapacityHint if it is set in config - return workerCapacityHint; + return config.getWorkerCapacityHint(); } else { // Assume capacity per worker as 1 return 1; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java index 1f940fd2e44..d4e3b5b4d35 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/ProvisioningStrategy.java @@ -20,8 +20,12 @@ package org.apache.druid.indexing.overlord.autoscaling; import org.apache.druid.guice.annotations.ExtensionPoint; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.TaskRunner; +import javax.annotation.Nonnull; +import java.util.Collection; + /** * In general, the resource management is tied to the runner. */ @@ -35,4 +39,15 @@ public interface ProvisioningStrategy * @param runner The TaskRunner state holder this strategy should use during execution */ ProvisioningService makeProvisioningService(T runner); + + /** + * Returns the expected number of task slots available for each worker. + * This method can returns -1 if the provisioning strategy does not support getting the expected worker capacity. + * + * @return the expected number of task slots available for each worker if provisioning strategy support getting the expected worker capacity, otherwise -1 + */ + default int getExpectedWorkerCapacity(@Nonnull Collection workers) + { + return -1; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 0076eb0335b..85bb3dbe3e2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionHolder; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueue; @@ -53,8 +54,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -122,6 +125,7 @@ public class OverlordResource private final AuditManager auditManager; private final AuthorizerMapper authorizerMapper; private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter; + private final ProvisioningStrategy provisioningStrategy; private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); @@ -135,7 +139,8 @@ public class OverlordResource JacksonConfigManager configManager, AuditManager auditManager, AuthorizerMapper authorizerMapper, - WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter + WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter, + ProvisioningStrategy provisioningStrategy ) { this.taskMaster = taskMaster; @@ -146,6 +151,7 @@ public class OverlordResource this.auditManager = auditManager; this.authorizerMapper = authorizerMapper; this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter; + this.provisioningStrategy = provisioningStrategy; } /** @@ -422,6 +428,69 @@ public class OverlordResource return Response.ok(workerConfigRef.get()).build(); } + /** + * Gets the total worker capacity of varies states of the cluster. + */ + @GET + @Path("/totalWorkerCapacity") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + public Response getTotalWorkerCapacity() + { + // Calculate current cluster capacity + int currentCapacity; + Optional taskRunnerOptional = taskMaster.getTaskRunner(); + if (!taskRunnerOptional.isPresent()) { + // Cannot serve call as not leader + return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); + } + TaskRunner taskRunner = taskRunnerOptional.get(); + Collection workers; + if (taskRunner instanceof WorkerTaskRunner) { + workers = ((WorkerTaskRunner) taskRunner).getWorkers(); + currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); + } else { + log.debug( + "Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers", + taskRunner, + taskRunner.getClass().getName() + ); + workers = ImmutableList.of(); + currentCapacity = -1; + } + + // Calculate maximum capacity with auto scale + int maximumCapacity; + if (workerConfigRef == null) { + workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class); + } + WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } else { + // Auto scale is not using DefaultWorkerBehaviorConfig + log.debug("Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", + workerBehaviorConfig, + workerBehaviorConfig.getClass().getSimpleName() + ); + maximumCapacity = -1; + } + return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build(); + } + // default value is used for backwards compatibility @POST @Path("/worker") diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java new file mode 100644 index 00000000000..f40a8f2466a --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Should be synchronized with org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo + */ +public class TotalWorkerCapacityResponse +{ + /** + * The total worker capacity of the current state of the cluster. This can be -1 if + * it cannot be determined. + */ + private final int currentClusterCapacity; + /** + * The total worker capacity of the cluster including auto scaling capability (scaling to max workers). + * This can be -1 if it cannot be determined or if auto scaling is not configured. + */ + private final int maximumCapacityWithAutoScale; + + @JsonCreator + public TotalWorkerCapacityResponse( + @JsonProperty("currentClusterCapacity") int currentClusterCapacity, + @JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale + ) + { + this.currentClusterCapacity = currentClusterCapacity; + this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale; + } + + @JsonProperty + public int getCurrentClusterCapacity() + { + return currentClusterCapacity; + } + + @JsonProperty + public int getMaximumCapacityWithAutoScale() + { + return maximumCapacityWithAutoScale; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index 34af0cadc56..249f510d818 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TestTasks; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.RemoteTaskRunner; import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem; import org.apache.druid.indexing.overlord.ZkWorker; @@ -54,8 +55,10 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -66,6 +69,7 @@ public class PendingTaskBasedProvisioningStrategyTest { private AutoScaler autoScaler; private Task testTask; + private PendingTaskBasedWorkerProvisioningConfig config; private PendingTaskBasedWorkerProvisioningStrategy strategy; private AtomicReference workerConfig; private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); @@ -79,7 +83,7 @@ public class PendingTaskBasedProvisioningStrategyTest testTask = TestTasks.immediateSuccess("task1"); - PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + config = new PendingTaskBasedWorkerProvisioningConfig() .setMaxScalingDuration(new Period(1000)) .setNumEventsToTrack(10) .setPendingTaskTimeout(new Period(0)) @@ -108,6 +112,102 @@ public class PendingTaskBasedProvisioningStrategyTest ); } + @Test + public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsValid() + { + int capacityHint = 10; + config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityHint(capacityHint); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(ImmutableList.of()); + Assert.assertEquals(capacityHint, expectedWorkerCapacity); + } + + @Test + public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsNotValid() + { + int capacityHint = -1; + config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityHint(capacityHint); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(ImmutableList.of()); + Assert.assertEquals(1, expectedWorkerCapacity); + } + + @Test + public void testGetExpectedWorkerCapacityWithSingleWorker() + { + int workerCapacity = 3; + Collection workerInfoCollection = ImmutableList.of( + new ImmutableWorkerInfo( + new Worker("http", "localhost0", "localhost0", workerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ) + ); + int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(workerInfoCollection); + Assert.assertEquals(workerCapacity, expectedWorkerCapacity); + } + + @Test + public void testGetExpectedWorkerCapacityWithMultipleWorker() + { + int workerOneCapacity = 3; + int workerTwoCapacity = 6; + Collection workerInfoCollection = ImmutableList.of( + new ImmutableWorkerInfo( + new Worker("http", "localhost0", "localhost0", workerOneCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ), + new ImmutableWorkerInfo( + new Worker("http", "localhost0", "localhost0", workerTwoCapacity + 3, "v1", WorkerConfig.DEFAULT_CATEGORY), 0, + new HashSet<>(), + new HashSet<>(), + DateTimes.nowUtc() + ) + ); + int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(workerInfoCollection); + // Use capacity of the first worker in the list + Assert.assertEquals(workerOneCapacity, expectedWorkerCapacity); + } + @Test public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index b919706923b..638a33d829b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; @@ -37,13 +39,21 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter; +import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; +import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; @@ -79,11 +89,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; public class OverlordResourceTest { private OverlordResource overlordResource; private TaskMaster taskMaster; + private JacksonConfigManager configManager; + private ProvisioningStrategy provisioningStrategy; private TaskStorageQueryAdapter taskStorageQueryAdapter; private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter; private HttpServletRequest req; @@ -97,6 +110,8 @@ public class OverlordResourceTest public void setUp() { taskRunner = EasyMock.createMock(TaskRunner.class); + configManager = EasyMock.createMock(JacksonConfigManager.class); + provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class); taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class); indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class); @@ -145,10 +160,11 @@ public class OverlordResourceTest taskStorageQueryAdapter, indexerMetadataStorageAdapter, null, - null, + configManager, null, authMapper, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + provisioningStrategy ); } @@ -1447,6 +1463,182 @@ public class OverlordResourceTest Assert.assertEquals(ImmutableMap.of("error", "Worker API returns error!"), response.getEntity()); } + @Test + public void testGetTotalWorkerCapacityNotLeader() + { + EasyMock.reset(taskMaster); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn( + Optional.absent() + ).anyTimes(); + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + configManager + ); + final Response response = overlordResource.getTotalWorkerCapacity(); + Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus()); + } + + @Test + public void testGetTotalWorkerCapacityWithUnknown() + { + WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); + AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + configManager + ); + final Response response = overlordResource.getTotalWorkerCapacity(); + Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); + } + + @Test + public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured() + { + AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(null); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + configManager + ); + final Response response = overlordResource.getTotalWorkerCapacity(); + Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); + } + + @Test + public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured() + { + DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null); + AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + configManager + ); + final Response response = overlordResource.getTotalWorkerCapacity(); + Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); + } + + @Test + public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity() + { + int expectedWorkerCapacity = 3; + int maxNumWorkers = 2; + WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); + Collection workerInfos = ImmutableList.of( + new ImmutableWorkerInfo( + new Worker( + "http", "testWorker", "192.0.0.1", expectedWorkerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + DateTimes.of("2015-01-01T01:01:01Z") + ) + ); + EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); + EasyMock.reset(taskMaster); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn( + Optional.of(workerTaskRunner) + ).anyTimes(); + EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes(); + AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); + DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); + AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.replay( + workerTaskRunner, + autoScaler, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + configManager, + provisioningStrategy + ); + final Response response = overlordResource.getTotalWorkerCapacity(); + Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); + Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); + } + + @Test + public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity() + { + int invalidExpectedCapacity = -1; + int maxNumWorkers = 2; + WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); + Collection workerInfos = ImmutableList.of( + new ImmutableWorkerInfo( + new Worker( + "http", "testWorker", "192.0.0.1", 3, "v1", WorkerConfig.DEFAULT_CATEGORY + ), + 2, + ImmutableSet.of("grp1", "grp2"), + ImmutableSet.of("task1", "task2"), + DateTimes.of("2015-01-01T01:01:01Z") + ) + ); + EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); + EasyMock.reset(taskMaster); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn( + Optional.of(workerTaskRunner) + ).anyTimes(); + EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes(); + AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); + DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); + AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.replay( + workerTaskRunner, + autoScaler, + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter, + configManager, + provisioningStrategy + ); + final Response response = overlordResource.getTotalWorkerCapacity(); + Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); + Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); + } + private void expectAuthorizationTokenCheck() { expectAuthorizationTokenCheck(Users.DRUID); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 5655a13345d..40ed6e02122 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -229,7 +229,8 @@ public class OverlordTest null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - workerTaskRunnerQueryAdapter + workerTaskRunnerQueryAdapter, + null ); Response response = overlordResource.getLeader(); Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity()); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java index 2fd1a5ee0f5..8e5ee737f03 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -145,12 +145,21 @@ public class CompactionResourceTestClient } } - public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots) throws Exception + public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception { - String url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s", - getCoordinatorURL(), - StringUtils.urlEncode(compactionTaskSlotRatio.toString()), - StringUtils.urlEncode(maxCompactionTaskSlots.toString())); + String url; + if (useAutoScaleSlots == null) { + url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s", + getCoordinatorURL(), + StringUtils.urlEncode(compactionTaskSlotRatio.toString()), + StringUtils.urlEncode(maxCompactionTaskSlots.toString())); + } else { + url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s&useAutoScaleSlots=%s", + getCoordinatorURL(), + StringUtils.urlEncode(compactionTaskSlotRatio.toString()), + StringUtils.urlEncode(maxCompactionTaskSlots.toString()), + StringUtils.urlEncode(useAutoScaleSlots.toString())); + } StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 84c1bf55aa5..8d980d76f12 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -306,7 +306,7 @@ public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingSer { final DataSourceCompactionConfig compactionConfig = CompactionUtil .createCompactionConfig(fullDatasourceName, Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO); - compactionResource.updateCompactionTaskSlot(0.5, 10); + compactionResource.updateCompactionTaskSlot(0.5, 10, null); compactionResource.submitCompactionConfig(compactionConfig); // Wait for compaction config to persist diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index a79fe883c87..f9c462a2600 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -102,7 +102,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest public void setup() throws Exception { // Set comapction slot to 5 - updateCompactionTaskSlot(0.5, 10); + updateCompactionTaskSlot(0.5, 10, null); fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix(); } @@ -235,7 +235,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception { // Set compactionTaskSlotRatio to 0 to prevent any compaction - updateCompactionTaskSlot(0, 0); + updateCompactionTaskSlot(0, 0, null); loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -252,7 +252,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest checkCompactionIntervals(intervalsBeforeCompaction); Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName)); // Update compaction slots to be 1 - updateCompactionTaskSlot(1, 1); + updateCompactionTaskSlot(1, 1, null); // One day compacted (1 new segment) and one day remains uncompacted. (3 total) forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -898,6 +898,24 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } + @Test + public void testUpdateCompactionTaskSlotWithUseAutoScaleSlots() throws Exception + { + // First try update without useAutoScaleSlots + updateCompactionTaskSlot(3, 5, null); + CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + // Should be default value which is false + Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots()); + // Now try update from default value to useAutoScaleSlots=true + updateCompactionTaskSlot(3, 5, true); + coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + Assert.assertTrue(coordinatorCompactionConfig.isUseAutoScaleSlots()); + // Now try update from useAutoScaleSlots=true to useAutoScaleSlots=false + updateCompactionTaskSlot(3, 5, false); + coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); + Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots()); + } + private void loadData(String indexTask) throws Exception { loadData(indexTask, ImmutableMap.of()); @@ -1124,13 +1142,16 @@ public class ITAutoCompactionTest extends AbstractIndexerTest } } - private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots) throws Exception + private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception { - compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots); + compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots); // Verify that the compaction config is updated correctly. CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs(); Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio); Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots); + if (useAutoScaleSlots != null) { + Assert.assertEquals(coordinatorCompactionConfig.isUseAutoScaleSlots(), useAutoScaleSlots.booleanValue()); + } } private void getAndAssertCompactionStatus( diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index ee21ad0c796..b63f3f09887 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -220,6 +220,32 @@ public class HttpIndexingServiceClient implements IndexingServiceClient } } + @Override + public int getTotalWorkerCapacityWithAutoScale() + { + try { + final StringFullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity") + .setHeader("Content-Type", MediaType.APPLICATION_JSON) + ); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while getting total worker capacity. status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + final IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = jsonMapper.readValue( + response.getContent(), + new TypeReference() {} + ); + return indexingTotalWorkerCapacityInfo.getMaximumCapacityWithAutoScale(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public List getActiveTasks() { diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index 7429c553f05..000f13254ac 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -49,8 +49,17 @@ public interface IndexingServiceClient @Nullable Map context ); + /** + * Gets the total worker capacity of the current state of the cluster. This can be -1 if it cannot be determined. + */ int getTotalWorkerCapacity(); + /** + * Gets the total worker capacity of the cluster including auto scaling capability (scaling to max workers). + * This can be -1 if it cannot be determined or if auto scaling is not configured. + */ + int getTotalWorkerCapacityWithAutoScale(); + String runTask(String taskId, Object taskObject); String cancelTask(String taskId); diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java new file mode 100644 index 00000000000..bd3194d41ab --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingTotalWorkerCapacityInfo.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Should be synchronized with org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse + */ +public class IndexingTotalWorkerCapacityInfo +{ + /** + * The total worker capacity of the current state of the cluster. This can be -1 if + * it cannot be determined. + */ + private final int currentClusterCapacity; + /** + * The total worker capacity of the cluster including auto scaling capability (scaling to max workers). + * This can be -1 if it cannot be determined or if auto scaling is not configured. + */ + private final int maximumCapacityWithAutoScale; + + @JsonCreator + public IndexingTotalWorkerCapacityInfo( + @JsonProperty("currentClusterCapacity") int currentClusterCapacity, + @JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale + ) + { + this.currentClusterCapacity = currentClusterCapacity; + this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale; + } + + @JsonProperty + public int getCurrentClusterCapacity() + { + return currentClusterCapacity; + } + + @JsonProperty + public int getMaximumCapacityWithAutoScale() + { + return maximumCapacityWithAutoScale; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java index 409a8135ff1..1c4ab4e9a96 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java @@ -39,10 +39,12 @@ public class CoordinatorCompactionConfig private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1; private static final int DEFAILT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE; + private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false; private final List compactionConfigs; private final double compactionTaskSlotRatio; private final int maxCompactionTaskSlots; + private final boolean useAutoScaleSlots; public static CoordinatorCompactionConfig from( CoordinatorCompactionConfig baseConfig, @@ -52,31 +54,34 @@ public class CoordinatorCompactionConfig return new CoordinatorCompactionConfig( compactionConfigs, baseConfig.compactionTaskSlotRatio, - baseConfig.maxCompactionTaskSlots + baseConfig.maxCompactionTaskSlots, + baseConfig.useAutoScaleSlots ); } public static CoordinatorCompactionConfig from( CoordinatorCompactionConfig baseConfig, @Nullable Double compactionTaskSlotRatio, - @Nullable Integer maxCompactionTaskSlots + @Nullable Integer maxCompactionTaskSlots, + @Nullable Boolean useAutoScaleSlots ) { return new CoordinatorCompactionConfig( baseConfig.compactionConfigs, compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio : compactionTaskSlotRatio, - maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : maxCompactionTaskSlots + maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : maxCompactionTaskSlots, + useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots : useAutoScaleSlots ); } public static CoordinatorCompactionConfig from(List compactionConfigs) { - return new CoordinatorCompactionConfig(compactionConfigs, null, null); + return new CoordinatorCompactionConfig(compactionConfigs, null, null, null); } public static CoordinatorCompactionConfig empty() { - return new CoordinatorCompactionConfig(ImmutableList.of(), null, null); + return new CoordinatorCompactionConfig(ImmutableList.of(), null, null, null); } public static AtomicReference watch(final JacksonConfigManager configManager) @@ -113,7 +118,8 @@ public class CoordinatorCompactionConfig public CoordinatorCompactionConfig( @JsonProperty("compactionConfigs") List compactionConfigs, @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, - @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots + @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, + @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots ) { this.compactionConfigs = compactionConfigs; @@ -123,6 +129,9 @@ public class CoordinatorCompactionConfig this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ? DEFAILT_MAX_COMPACTION_TASK_SLOTS : maxCompactionTaskSlots; + this.useAutoScaleSlots = useAutoScaleSlots == null ? + DEFAULT_USE_AUTO_SCALE_SLOTS : + useAutoScaleSlots; } @JsonProperty @@ -143,20 +152,10 @@ public class CoordinatorCompactionConfig return maxCompactionTaskSlots; } - @Override - public String toString() + @JsonProperty + public boolean isUseAutoScaleSlots() { - return "CoordinatorCompactionConfig{" + - ", compactionConfigs=" + compactionConfigs + - ", compactionTaskSlotRatio=" + compactionTaskSlotRatio + - ", maxCompactionTaskSlots=" + maxCompactionTaskSlots + - '}'; - } - - @Override - public int hashCode() - { - return Objects.hash(compactionConfigs, compactionTaskSlotRatio, maxCompactionTaskSlots); + return useAutoScaleSlots; } @Override @@ -168,16 +167,27 @@ public class CoordinatorCompactionConfig if (o == null || getClass() != o.getClass()) { return false; } - CoordinatorCompactionConfig that = (CoordinatorCompactionConfig) o; + return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 && + maxCompactionTaskSlots == that.maxCompactionTaskSlots && + useAutoScaleSlots == that.useAutoScaleSlots && + Objects.equals(compactionConfigs, that.compactionConfigs); + } - if (!Objects.equals(compactionConfigs, that.compactionConfigs)) { - return false; - } - if (compactionTaskSlotRatio != that.compactionTaskSlotRatio) { - return false; - } + @Override + public int hashCode() + { + return Objects.hash(compactionConfigs, compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots); + } - return maxCompactionTaskSlots == that.maxCompactionTaskSlots; + @Override + public String toString() + { + return "CoordinatorCompactionConfig{" + + "compactionConfigs=" + compactionConfigs + + ", compactionTaskSlotRatio=" + compactionTaskSlotRatio + + ", maxCompactionTaskSlots=" + maxCompactionTaskSlots + + ", useAutoScaleSlots=" + useAutoScaleSlots + + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index ad6566f8c62..6598a6ede18 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -174,8 +174,21 @@ public class CompactSegments implements CoordinatorDuty final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction); + int totalCapacity; + if (dynamicConfig.isUseAutoScaleSlots()) { + try { + totalCapacity = indexingServiceClient.getTotalWorkerCapacityWithAutoScale(); + } + catch (Exception e) { + LOG.warn("Failed to get total worker capacity with auto scale slots. Falling back to current capacity count"); + totalCapacity = indexingServiceClient.getTotalWorkerCapacity(); + } + } else { + totalCapacity = indexingServiceClient.getTotalWorkerCapacity(); + } + final int compactionTaskCapacity = (int) Math.min( - indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), + totalCapacity * dynamicConfig.getCompactionTaskSlotRatio(), dynamicConfig.getMaxCompactionTaskSlots() ); final int numAvailableCompactionTaskSlots; diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index cb17c45e7c8..e791b6a49df 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -93,6 +93,7 @@ public class CoordinatorCompactionConfigsResource public Response setCompactionTaskLimit( @QueryParam("ratio") Double compactionTaskSlotRatio, @QueryParam("max") Integer maxCompactionTaskSlots, + @QueryParam("useAutoScaleSlots") Boolean useAutoScaleSlots, @HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author, @HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment, @Context HttpServletRequest req @@ -104,7 +105,8 @@ public class CoordinatorCompactionConfigsResource final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from( current, compactionTaskSlotRatio, - maxCompactionTaskSlots + maxCompactionTaskSlots, + useAutoScaleSlots ); return manager.set( diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index 0886e124e13..445d129a11b 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -342,5 +342,36 @@ public class HttpIndexingServiceClientTest ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue(); Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds()); } + + @Test + public void testGetTotalWorkerCapacityWithAutoScale() throws Exception + { + int currentClusterCapacity = 5; + int maximumCapacityWithAutoScale = 10; + // Mock response for /druid/indexer/v1/totalWorkerCapacity + HttpResponse totalWorkerCapacityResponse = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(totalWorkerCapacityResponse.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); + EasyMock.expect(totalWorkerCapacityResponse.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); + EasyMock.replay(totalWorkerCapacityResponse); + IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = new IndexingTotalWorkerCapacityInfo(currentClusterCapacity, maximumCapacityWithAutoScale); + StringFullResponseHolder autoScaleResponseHolder = new StringFullResponseHolder( + totalWorkerCapacityResponse, + StandardCharsets.UTF_8 + ).addChunk(jsonMapper.writeValueAsString(indexingTotalWorkerCapacityInfo)); + EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class))) + .andReturn(autoScaleResponseHolder) + .once(); + EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity")) + .andReturn(new Request( + HttpMethod.GET, + new URL("http://localhost:8090/druid/indexer/v1/totalWorkerCapacity") + )) + .once(); + EasyMock.replay(druidLeaderClient); + + final int actualResponse = httpIndexingServiceClient.getTotalWorkerCapacityWithAutoScale(); + Assert.assertEquals(maximumCapacityWithAutoScale, actualResponse); + EasyMock.verify(druidLeaderClient); + } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 17cbc418e15..447a32d3b40 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -68,6 +68,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient return 0; } + @Override + public int getTotalWorkerCapacityWithAutoScale() + { + return 0; + } + @Override public String runTask(String taskId, Object taskObject) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index df818864f54..2892d44d928 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -38,6 +38,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.HttpIndexingServiceClient; +import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; import org.apache.druid.client.indexing.IndexingWorker; import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.client.indexing.TaskPayloadResponse; @@ -133,6 +134,7 @@ public class CompactSegmentsTest private static final int TOTAL_BYTE_PER_DATASOURCE = 440; private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44; private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11; + private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10; @Parameterized.Parameters(name = "{0}") public static Collection constructorFeeder() @@ -647,6 +649,36 @@ public class CompactSegmentsTest Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)); } + @Test + public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsOverMaxSlot() + { + int maxCompactionSlot = 3; + Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE); + final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); + leaderClient.start(); + final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient); + final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true); + Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)); + Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)); + Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)); + } + + @Test + public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot() + { + int maxCompactionSlot = 100; + Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE); + final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); + leaderClient.start(); + final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); + final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient); + final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true); + Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)); + Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)); + Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)); + } + @Test public void testCompactWithoutGranularitySpec() { @@ -1794,6 +1826,16 @@ public class CompactSegmentsTest List compactionConfigs, @Nullable Integer numCompactionTaskSlots ) + { + return doCompactSegments(compactSegments, compactionConfigs, numCompactionTaskSlots, false); + } + + private CoordinatorStats doCompactSegments( + CompactSegments compactSegments, + List compactionConfigs, + @Nullable Integer numCompactionTaskSlots, + boolean useAutoScaleSlots + ) { DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers .newBuilder() @@ -1801,8 +1843,9 @@ public class CompactSegmentsTest .withCompactionConfig( new CoordinatorCompactionConfig( compactionConfigs, - numCompactionTaskSlots == null ? null : 100., // 100% when numCompactionTaskSlots is not null - numCompactionTaskSlots + numCompactionTaskSlots == null ? null : 1., // 100% when numCompactionTaskSlots is not null + numCompactionTaskSlots, + useAutoScaleSlots ) ) .build(); @@ -1994,6 +2037,8 @@ public class CompactSegmentsTest return handleTask(request); } else if (urlString.contains("/druid/indexer/v1/workers")) { return handleWorkers(); + } else if (urlString.contains("/druid/indexer/v1/totalWorkerCapacity")) { + return handleTotalWorkerCapacity(); } else if (urlString.contains("/druid/indexer/v1/waitingTasks") || urlString.contains("/druid/indexer/v1/pendingTasks") || urlString.contains("/druid/indexer/v1/runningTasks")) { @@ -2035,6 +2080,12 @@ public class CompactSegmentsTest return createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos)); } + private StringFullResponseHolder handleTotalWorkerCapacity() throws JsonProcessingException + { + IndexingTotalWorkerCapacityInfo info = new IndexingTotalWorkerCapacityInfo(5, 10); + return createStringFullResponseHolder(jsonMapper.writeValueAsString(info)); + } + private StringFullResponseHolder handleTask(Request request) throws IOException { final ClientTaskQuery taskQuery = jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 4f75bf75444..6a38d5bcc65 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -122,6 +122,7 @@ public class CoordinatorCompactionConfigsResourceTest Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit( compactionTaskSlotRatio, maxCompactionTaskSlots, + true, author, comment, mockHttpServletRequest @@ -131,6 +132,7 @@ public class CoordinatorCompactionConfigsResourceTest Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES); Assert.assertNotNull(newConfigCaptor.getValue()); Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots); + Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots()); Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0); } @@ -279,6 +281,7 @@ public class CoordinatorCompactionConfigsResourceTest Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit( compactionTaskSlotRatio, maxCompactionTaskSlots, + true, author, comment, mockHttpServletRequest @@ -287,6 +290,7 @@ public class CoordinatorCompactionConfigsResourceTest Assert.assertNull(oldConfigCaptor.getValue()); Assert.assertNotNull(newConfigCaptor.getValue()); Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots); + Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots()); Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0); }