Add a flag to allow auto compaction task slot ratio to consider auto scaler slots (#12228)

* add impl

* fix checkstyle

* add unit tests

* checkstyle

* add IT

* fix IT

* add comments

* fix checkstyle
This commit is contained in:
Maytas Monsereenusorn 2022-02-06 20:46:05 -08:00 committed by GitHub
parent 159f97dcb0
commit 2b8e7fc0b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 737 additions and 55 deletions

View File

@ -267,8 +267,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
remoteTaskRunnerConfig,
workerConfig,
pendingTasks,
workers,
config.getWorkerCapacityHint()
workers
);
log.debug("More workers needed: %d", moreWorkersNeeded);
@ -296,8 +295,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final DefaultWorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
final Collection<ImmutableWorkerInfo> workers,
final int workerCapacityHint
final Collection<ImmutableWorkerInfo> workers
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
@ -312,7 +310,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
}
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
int need = 0;
int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint);
int capacity = getExpectedWorkerCapacity(workers);
log.info("Expected worker capacity: %d", capacity);
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
@ -458,14 +456,15 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr
return currValidWorkers;
}
private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers, final int workerCapacityHint)
@Override
public int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
{
int size = workers.size();
if (size == 0) {
// No existing workers
if (workerCapacityHint > 0) {
if (config.getWorkerCapacityHint() > 0) {
// Return workerCapacityHint if it is set in config
return workerCapacityHint;
return config.getWorkerCapacityHint();
} else {
// Assume capacity per worker as 1
return 1;

View File

@ -20,8 +20,12 @@
package org.apache.druid.indexing.overlord.autoscaling;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunner;
import javax.annotation.Nonnull;
import java.util.Collection;
/**
* In general, the resource management is tied to the runner.
*/
@ -35,4 +39,15 @@ public interface ProvisioningStrategy<T extends TaskRunner>
* @param runner The TaskRunner state holder this strategy should use during execution
*/
ProvisioningService makeProvisioningService(T runner);
/**
* Returns the expected number of task slots available for each worker.
* This method can returns -1 if the provisioning strategy does not support getting the expected worker capacity.
*
* @return the expected number of task slots available for each worker if provisioning strategy support getting the expected worker capacity, otherwise -1
*/
default int getExpectedWorkerCapacity(@Nonnull Collection<ImmutableWorkerInfo> workers)
{
return -1;
}
}

View File

@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionHolder;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
@ -53,8 +54,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -122,6 +125,7 @@ public class OverlordResource
private final AuditManager auditManager;
private final AuthorizerMapper authorizerMapper;
private final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter;
private final ProvisioningStrategy provisioningStrategy;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
@ -135,7 +139,8 @@ public class OverlordResource
JacksonConfigManager configManager,
AuditManager auditManager,
AuthorizerMapper authorizerMapper,
WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter,
ProvisioningStrategy provisioningStrategy
)
{
this.taskMaster = taskMaster;
@ -146,6 +151,7 @@ public class OverlordResource
this.auditManager = auditManager;
this.authorizerMapper = authorizerMapper;
this.workerTaskRunnerQueryAdapter = workerTaskRunnerQueryAdapter;
this.provisioningStrategy = provisioningStrategy;
}
/**
@ -422,6 +428,69 @@ public class OverlordResource
return Response.ok(workerConfigRef.get()).build();
}
/**
* Gets the total worker capacity of varies states of the cluster.
*/
@GET
@Path("/totalWorkerCapacity")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(ConfigResourceFilter.class)
public Response getTotalWorkerCapacity()
{
// Calculate current cluster capacity
int currentCapacity;
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
// Cannot serve call as not leader
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
TaskRunner taskRunner = taskRunnerOptional.get();
Collection<ImmutableWorkerInfo> workers;
if (taskRunner instanceof WorkerTaskRunner) {
workers = ((WorkerTaskRunner) taskRunner).getWorkers();
currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
} else {
log.debug(
"Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers",
taskRunner,
taskRunner.getClass().getName()
);
workers = ImmutableList.of();
currentCapacity = -1;
}
// Calculate maximum capacity with auto scale
int maximumCapacity;
if (workerConfigRef == null) {
workerConfigRef = configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class);
}
WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers);
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
} else {
// Auto scale is not using DefaultWorkerBehaviorConfig
log.debug("Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
workerBehaviorConfig,
workerBehaviorConfig.getClass().getSimpleName()
);
maximumCapacity = -1;
}
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build();
}
// default value is used for backwards compatibility
@POST
@Path("/worker")

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Should be synchronized with org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo
*/
public class TotalWorkerCapacityResponse
{
/**
* The total worker capacity of the current state of the cluster. This can be -1 if
* it cannot be determined.
*/
private final int currentClusterCapacity;
/**
* The total worker capacity of the cluster including auto scaling capability (scaling to max workers).
* This can be -1 if it cannot be determined or if auto scaling is not configured.
*/
private final int maximumCapacityWithAutoScale;
@JsonCreator
public TotalWorkerCapacityResponse(
@JsonProperty("currentClusterCapacity") int currentClusterCapacity,
@JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale
)
{
this.currentClusterCapacity = currentClusterCapacity;
this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
}
@JsonProperty
public int getCurrentClusterCapacity()
{
return currentClusterCapacity;
}
@JsonProperty
public int getMaximumCapacityWithAutoScale()
{
return maximumCapacityWithAutoScale;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestTasks;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunner;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.ZkWorker;
@ -54,8 +55,10 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -66,6 +69,7 @@ public class PendingTaskBasedProvisioningStrategyTest
{
private AutoScaler autoScaler;
private Task testTask;
private PendingTaskBasedWorkerProvisioningConfig config;
private PendingTaskBasedWorkerProvisioningStrategy strategy;
private AtomicReference<WorkerBehaviorConfig> workerConfig;
private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service");
@ -79,7 +83,7 @@ public class PendingTaskBasedProvisioningStrategyTest
testTask = TestTasks.immediateSuccess("task1");
PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
@ -108,6 +112,102 @@ public class PendingTaskBasedProvisioningStrategyTest
);
}
@Test
public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsValid()
{
int capacityHint = 10;
config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2)
.setWorkerCapacityHint(capacityHint);
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(ImmutableList.of());
Assert.assertEquals(capacityHint, expectedWorkerCapacity);
}
@Test
public void testGetExpectedWorkerCapacityWithNoWorkerAndHintIsNotValid()
{
int capacityHint = -1;
config = new PendingTaskBasedWorkerProvisioningConfig()
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(10)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(MIN_VERSION)
.setMaxScalingStep(2)
.setWorkerCapacityHint(capacityHint);
strategy = new PendingTaskBasedWorkerProvisioningStrategy(
config,
DSuppliers.of(workerConfig),
new ProvisioningSchedulerConfig(),
new Supplier<ScheduledExecutorService>()
{
@Override
public ScheduledExecutorService get()
{
return executorService;
}
}
);
int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(ImmutableList.of());
Assert.assertEquals(1, expectedWorkerCapacity);
}
@Test
public void testGetExpectedWorkerCapacityWithSingleWorker()
{
int workerCapacity = 3;
Collection<ImmutableWorkerInfo> workerInfoCollection = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", workerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
);
int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(workerInfoCollection);
Assert.assertEquals(workerCapacity, expectedWorkerCapacity);
}
@Test
public void testGetExpectedWorkerCapacityWithMultipleWorker()
{
int workerOneCapacity = 3;
int workerTwoCapacity = 6;
Collection<ImmutableWorkerInfo> workerInfoCollection = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", workerOneCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
),
new ImmutableWorkerInfo(
new Worker("http", "localhost0", "localhost0", workerTwoCapacity + 3, "v1", WorkerConfig.DEFAULT_CATEGORY), 0,
new HashSet<>(),
new HashSet<>(),
DateTimes.nowUtc()
)
);
int expectedWorkerCapacity = strategy.getExpectedWorkerCapacity(workerInfoCollection);
// Use capacity of the first worker in the list
Assert.assertEquals(workerOneCapacity, expectedWorkerCapacity);
}
@Test
public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet()
{

View File

@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@ -37,13 +39,21 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
@ -79,11 +89,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class OverlordResourceTest
{
private OverlordResource overlordResource;
private TaskMaster taskMaster;
private JacksonConfigManager configManager;
private ProvisioningStrategy provisioningStrategy;
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
private HttpServletRequest req;
@ -97,6 +110,8 @@ public class OverlordResourceTest
public void setUp()
{
taskRunner = EasyMock.createMock(TaskRunner.class);
configManager = EasyMock.createMock(JacksonConfigManager.class);
provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class);
taskStorageQueryAdapter = EasyMock.createStrictMock(TaskStorageQueryAdapter.class);
indexerMetadataStorageAdapter = EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@ -145,10 +160,11 @@ public class OverlordResourceTest
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
null,
null,
configManager,
null,
authMapper,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
provisioningStrategy
);
}
@ -1447,6 +1463,182 @@ public class OverlordResourceTest
Assert.assertEquals(ImmutableMap.of("error", "Worker API returns error!"), response.getEntity());
}
@Test
public void testGetTotalWorkerCapacityNotLeader()
{
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.absent()
).anyTimes();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus());
}
@Test
public void testGetTotalWorkerCapacityWithUnknown()
{
WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured()
{
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(null);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured()
{
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity()
{
int expectedWorkerCapacity = 3;
int maxNumWorkers = 2;
WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class);
Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", expectedWorkerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
).anyTimes();
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes();
AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
workerTaskRunner,
autoScaler,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
provisioningStrategy
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
{
int invalidExpectedCapacity = -1;
int maxNumWorkers = 2;
WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class);
Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 3, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
).anyTimes();
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
workerTaskRunner,
autoScaler,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
provisioningStrategy
);
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
private void expectAuthorizationTokenCheck()
{
expectAuthorizationTokenCheck(Users.DRUID);

View File

@ -229,7 +229,8 @@ public class OverlordTest
null,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
workerTaskRunnerQueryAdapter
workerTaskRunnerQueryAdapter,
null
);
Response response = overlordResource.getLeader();
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());

View File

@ -145,12 +145,21 @@ public class CompactionResourceTestClient
}
}
public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots) throws Exception
public void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
{
String url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
getCoordinatorURL(),
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
String url;
if (useAutoScaleSlots == null) {
url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s",
getCoordinatorURL(),
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
StringUtils.urlEncode(maxCompactionTaskSlots.toString()));
} else {
url = StringUtils.format("%sconfig/compaction/taskslots?ratio=%s&max=%s&useAutoScaleSlots=%s",
getCoordinatorURL(),
StringUtils.urlEncode(compactionTaskSlotRatio.toString()),
StringUtils.urlEncode(maxCompactionTaskSlots.toString()),
StringUtils.urlEncode(useAutoScaleSlots.toString()));
}
StatusResponseHolder response = httpClient.go(new Request(HttpMethod.POST, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(

View File

@ -306,7 +306,7 @@ public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingSer
{
final DataSourceCompactionConfig compactionConfig = CompactionUtil
.createCompactionConfig(fullDatasourceName, Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO);
compactionResource.updateCompactionTaskSlot(0.5, 10);
compactionResource.updateCompactionTaskSlot(0.5, 10, null);
compactionResource.submitCompactionConfig(compactionConfig);
// Wait for compaction config to persist

View File

@ -102,7 +102,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
public void setup() throws Exception
{
// Set comapction slot to 5
updateCompactionTaskSlot(0.5, 10);
updateCompactionTaskSlot(0.5, 10, null);
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}
@ -235,7 +235,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
{
// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 0);
updateCompactionTaskSlot(0, 0, null);
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
@ -252,7 +252,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertNull(compactionResource.getCompactionStatus(fullDatasourceName));
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
updateCompactionTaskSlot(1, 1, null);
// One day compacted (1 new segment) and one day remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
@ -898,6 +898,24 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
@Test
public void testUpdateCompactionTaskSlotWithUseAutoScaleSlots() throws Exception
{
// First try update without useAutoScaleSlots
updateCompactionTaskSlot(3, 5, null);
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
// Should be default value which is false
Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
// Now try update from default value to useAutoScaleSlots=true
updateCompactionTaskSlot(3, 5, true);
coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertTrue(coordinatorCompactionConfig.isUseAutoScaleSlots());
// Now try update from useAutoScaleSlots=true to useAutoScaleSlots=false
updateCompactionTaskSlot(3, 5, false);
coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertFalse(coordinatorCompactionConfig.isUseAutoScaleSlots());
}
private void loadData(String indexTask) throws Exception
{
loadData(indexTask, ImmutableMap.of());
@ -1124,13 +1142,16 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots) throws Exception
private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
{
compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots);
compactionResource.updateCompactionTaskSlot(compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots);
// Verify that the compaction config is updated correctly.
CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
Assert.assertEquals(coordinatorCompactionConfig.getCompactionTaskSlotRatio(), compactionTaskSlotRatio);
Assert.assertEquals(coordinatorCompactionConfig.getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
if (useAutoScaleSlots != null) {
Assert.assertEquals(coordinatorCompactionConfig.isUseAutoScaleSlots(), useAutoScaleSlots.booleanValue());
}
}
private void getAndAssertCompactionStatus(

View File

@ -220,6 +220,32 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
}
@Override
public int getTotalWorkerCapacityWithAutoScale()
{
try {
final StringFullResponseHolder response = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity")
.setHeader("Content-Type", MediaType.APPLICATION_JSON)
);
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting total worker capacity. status[%s] content[%s]",
response.getStatus(),
response.getContent()
);
}
final IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = jsonMapper.readValue(
response.getContent(),
new TypeReference<IndexingTotalWorkerCapacityInfo>() {}
);
return indexingTotalWorkerCapacityInfo.getMaximumCapacityWithAutoScale();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public List<TaskStatusPlus> getActiveTasks()
{

View File

@ -49,8 +49,17 @@ public interface IndexingServiceClient
@Nullable Map<String, Object> context
);
/**
* Gets the total worker capacity of the current state of the cluster. This can be -1 if it cannot be determined.
*/
int getTotalWorkerCapacity();
/**
* Gets the total worker capacity of the cluster including auto scaling capability (scaling to max workers).
* This can be -1 if it cannot be determined or if auto scaling is not configured.
*/
int getTotalWorkerCapacityWithAutoScale();
String runTask(String taskId, Object taskObject);
String cancelTask(String taskId);

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Should be synchronized with org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse
*/
public class IndexingTotalWorkerCapacityInfo
{
/**
* The total worker capacity of the current state of the cluster. This can be -1 if
* it cannot be determined.
*/
private final int currentClusterCapacity;
/**
* The total worker capacity of the cluster including auto scaling capability (scaling to max workers).
* This can be -1 if it cannot be determined or if auto scaling is not configured.
*/
private final int maximumCapacityWithAutoScale;
@JsonCreator
public IndexingTotalWorkerCapacityInfo(
@JsonProperty("currentClusterCapacity") int currentClusterCapacity,
@JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale
)
{
this.currentClusterCapacity = currentClusterCapacity;
this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
}
@JsonProperty
public int getCurrentClusterCapacity()
{
return currentClusterCapacity;
}
@JsonProperty
public int getMaximumCapacityWithAutoScale()
{
return maximumCapacityWithAutoScale;
}
}

View File

@ -39,10 +39,12 @@ public class CoordinatorCompactionConfig
private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1;
private static final int DEFAILT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
private final List<DataSourceCompactionConfig> compactionConfigs;
private final double compactionTaskSlotRatio;
private final int maxCompactionTaskSlots;
private final boolean useAutoScaleSlots;
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
@ -52,31 +54,34 @@ public class CoordinatorCompactionConfig
return new CoordinatorCompactionConfig(
compactionConfigs,
baseConfig.compactionTaskSlotRatio,
baseConfig.maxCompactionTaskSlots
baseConfig.maxCompactionTaskSlots,
baseConfig.useAutoScaleSlots
);
}
public static CoordinatorCompactionConfig from(
CoordinatorCompactionConfig baseConfig,
@Nullable Double compactionTaskSlotRatio,
@Nullable Integer maxCompactionTaskSlots
@Nullable Integer maxCompactionTaskSlots,
@Nullable Boolean useAutoScaleSlots
)
{
return new CoordinatorCompactionConfig(
baseConfig.compactionConfigs,
compactionTaskSlotRatio == null ? baseConfig.compactionTaskSlotRatio : compactionTaskSlotRatio,
maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : maxCompactionTaskSlots
maxCompactionTaskSlots == null ? baseConfig.maxCompactionTaskSlots : maxCompactionTaskSlots,
useAutoScaleSlots == null ? baseConfig.useAutoScaleSlots : useAutoScaleSlots
);
}
public static CoordinatorCompactionConfig from(List<DataSourceCompactionConfig> compactionConfigs)
{
return new CoordinatorCompactionConfig(compactionConfigs, null, null);
return new CoordinatorCompactionConfig(compactionConfigs, null, null, null);
}
public static CoordinatorCompactionConfig empty()
{
return new CoordinatorCompactionConfig(ImmutableList.of(), null, null);
return new CoordinatorCompactionConfig(ImmutableList.of(), null, null, null);
}
public static AtomicReference<CoordinatorCompactionConfig> watch(final JacksonConfigManager configManager)
@ -113,7 +118,8 @@ public class CoordinatorCompactionConfig
public CoordinatorCompactionConfig(
@JsonProperty("compactionConfigs") List<DataSourceCompactionConfig> compactionConfigs,
@JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio,
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots
@JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots,
@JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots
)
{
this.compactionConfigs = compactionConfigs;
@ -123,6 +129,9 @@ public class CoordinatorCompactionConfig
this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
DEFAILT_MAX_COMPACTION_TASK_SLOTS :
maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots == null ?
DEFAULT_USE_AUTO_SCALE_SLOTS :
useAutoScaleSlots;
}
@JsonProperty
@ -143,20 +152,10 @@ public class CoordinatorCompactionConfig
return maxCompactionTaskSlots;
}
@Override
public String toString()
@JsonProperty
public boolean isUseAutoScaleSlots()
{
return "CoordinatorCompactionConfig{" +
", compactionConfigs=" + compactionConfigs +
", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
'}';
}
@Override
public int hashCode()
{
return Objects.hash(compactionConfigs, compactionTaskSlotRatio, maxCompactionTaskSlots);
return useAutoScaleSlots;
}
@Override
@ -168,16 +167,27 @@ public class CoordinatorCompactionConfig
if (o == null || getClass() != o.getClass()) {
return false;
}
CoordinatorCompactionConfig that = (CoordinatorCompactionConfig) o;
return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 &&
maxCompactionTaskSlots == that.maxCompactionTaskSlots &&
useAutoScaleSlots == that.useAutoScaleSlots &&
Objects.equals(compactionConfigs, that.compactionConfigs);
}
if (!Objects.equals(compactionConfigs, that.compactionConfigs)) {
return false;
}
if (compactionTaskSlotRatio != that.compactionTaskSlotRatio) {
return false;
}
@Override
public int hashCode()
{
return Objects.hash(compactionConfigs, compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots);
}
return maxCompactionTaskSlots == that.maxCompactionTaskSlots;
@Override
public String toString()
{
return "CoordinatorCompactionConfig{" +
"compactionConfigs=" + compactionConfigs +
", compactionTaskSlotRatio=" + compactionTaskSlotRatio +
", maxCompactionTaskSlots=" + maxCompactionTaskSlots +
", useAutoScaleSlots=" + useAutoScaleSlots +
'}';
}
}

View File

@ -174,8 +174,21 @@ public class CompactSegments implements CoordinatorDuty
final CompactionSegmentIterator iterator =
policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
int totalCapacity;
if (dynamicConfig.isUseAutoScaleSlots()) {
try {
totalCapacity = indexingServiceClient.getTotalWorkerCapacityWithAutoScale();
}
catch (Exception e) {
LOG.warn("Failed to get total worker capacity with auto scale slots. Falling back to current capacity count");
totalCapacity = indexingServiceClient.getTotalWorkerCapacity();
}
} else {
totalCapacity = indexingServiceClient.getTotalWorkerCapacity();
}
final int compactionTaskCapacity = (int) Math.min(
indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
totalCapacity * dynamicConfig.getCompactionTaskSlotRatio(),
dynamicConfig.getMaxCompactionTaskSlots()
);
final int numAvailableCompactionTaskSlots;

View File

@ -93,6 +93,7 @@ public class CoordinatorCompactionConfigsResource
public Response setCompactionTaskLimit(
@QueryParam("ratio") Double compactionTaskSlotRatio,
@QueryParam("max") Integer maxCompactionTaskSlots,
@QueryParam("useAutoScaleSlots") Boolean useAutoScaleSlots,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment,
@Context HttpServletRequest req
@ -104,7 +105,8 @@ public class CoordinatorCompactionConfigsResource
final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
current,
compactionTaskSlotRatio,
maxCompactionTaskSlots
maxCompactionTaskSlots,
useAutoScaleSlots
);
return manager.set(

View File

@ -342,5 +342,36 @@ public class HttpIndexingServiceClientTest
ClientCompactionTaskQuery taskQuery = (ClientCompactionTaskQuery) captureTask.getValue();
Assert.assertNull(taskQuery.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds());
}
@Test
public void testGetTotalWorkerCapacityWithAutoScale() throws Exception
{
int currentClusterCapacity = 5;
int maximumCapacityWithAutoScale = 10;
// Mock response for /druid/indexer/v1/totalWorkerCapacity
HttpResponse totalWorkerCapacityResponse = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(totalWorkerCapacityResponse.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(totalWorkerCapacityResponse.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(totalWorkerCapacityResponse);
IndexingTotalWorkerCapacityInfo indexingTotalWorkerCapacityInfo = new IndexingTotalWorkerCapacityInfo(currentClusterCapacity, maximumCapacityWithAutoScale);
StringFullResponseHolder autoScaleResponseHolder = new StringFullResponseHolder(
totalWorkerCapacityResponse,
StandardCharsets.UTF_8
).addChunk(jsonMapper.writeValueAsString(indexingTotalWorkerCapacityInfo));
EasyMock.expect(druidLeaderClient.go(EasyMock.anyObject(Request.class)))
.andReturn(autoScaleResponseHolder)
.once();
EasyMock.expect(druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity"))
.andReturn(new Request(
HttpMethod.GET,
new URL("http://localhost:8090/druid/indexer/v1/totalWorkerCapacity")
))
.once();
EasyMock.replay(druidLeaderClient);
final int actualResponse = httpIndexingServiceClient.getTotalWorkerCapacityWithAutoScale();
Assert.assertEquals(maximumCapacityWithAutoScale, actualResponse);
EasyMock.verify(druidLeaderClient);
}
}

View File

@ -68,6 +68,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
return 0;
}
@Override
public int getTotalWorkerCapacityWithAutoScale()
{
return 0;
}
@Override
public String runTask(String taskId, Object taskObject)
{

View File

@ -38,6 +38,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
@ -133,6 +134,7 @@ public class CompactSegmentsTest
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
@ -647,6 +649,36 @@ public class CompactSegmentsTest
Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
}
@Test
public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsOverMaxSlot()
{
int maxCompactionSlot = 3;
Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
}
@Test
public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot()
{
int maxCompactionSlot = 100;
Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE);
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
}
@Test
public void testCompactWithoutGranularitySpec()
{
@ -1794,6 +1826,16 @@ public class CompactSegmentsTest
List<DataSourceCompactionConfig> compactionConfigs,
@Nullable Integer numCompactionTaskSlots
)
{
return doCompactSegments(compactSegments, compactionConfigs, numCompactionTaskSlots, false);
}
private CoordinatorStats doCompactSegments(
CompactSegments compactSegments,
List<DataSourceCompactionConfig> compactionConfigs,
@Nullable Integer numCompactionTaskSlots,
boolean useAutoScaleSlots
)
{
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder()
@ -1801,8 +1843,9 @@ public class CompactSegmentsTest
.withCompactionConfig(
new CoordinatorCompactionConfig(
compactionConfigs,
numCompactionTaskSlots == null ? null : 100., // 100% when numCompactionTaskSlots is not null
numCompactionTaskSlots
numCompactionTaskSlots == null ? null : 1., // 100% when numCompactionTaskSlots is not null
numCompactionTaskSlots,
useAutoScaleSlots
)
)
.build();
@ -1994,6 +2037,8 @@ public class CompactSegmentsTest
return handleTask(request);
} else if (urlString.contains("/druid/indexer/v1/workers")) {
return handleWorkers();
} else if (urlString.contains("/druid/indexer/v1/totalWorkerCapacity")) {
return handleTotalWorkerCapacity();
} else if (urlString.contains("/druid/indexer/v1/waitingTasks")
|| urlString.contains("/druid/indexer/v1/pendingTasks")
|| urlString.contains("/druid/indexer/v1/runningTasks")) {
@ -2035,6 +2080,12 @@ public class CompactSegmentsTest
return createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos));
}
private StringFullResponseHolder handleTotalWorkerCapacity() throws JsonProcessingException
{
IndexingTotalWorkerCapacityInfo info = new IndexingTotalWorkerCapacityInfo(5, 10);
return createStringFullResponseHolder(jsonMapper.writeValueAsString(info));
}
private StringFullResponseHolder handleTask(Request request) throws IOException
{
final ClientTaskQuery taskQuery = jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class);

View File

@ -122,6 +122,7 @@ public class CoordinatorCompactionConfigsResourceTest
Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
true,
author,
comment,
mockHttpServletRequest
@ -131,6 +132,7 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertEquals(oldConfigCaptor.getValue(), OLD_CONFIG_IN_BYTES);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
}
@ -279,6 +281,7 @@ public class CoordinatorCompactionConfigsResourceTest
Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
true,
author,
comment,
mockHttpServletRequest
@ -287,6 +290,7 @@ public class CoordinatorCompactionConfigsResourceTest
Assert.assertNull(oldConfigCaptor.getValue());
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertTrue(newConfigCaptor.getValue().isUseAutoScaleSlots());
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
}