Fix capacity response in mm-less ingestion (#14888)

Changes:
- Fix capacity response in mm-less ingestion.
- Add field usedClusterCapacity to the GET /totalWorkerCapacity response.
This API should be used to get the total ingestion capacity on the overlord.
- Remove method `isK8sTaskRunner` from interface `TaskRunner`
This commit is contained in:
George Shiqi Wu 2023-08-24 22:47:38 -04:00 committed by GitHub
parent e51181957c
commit ad32f84586
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 121 additions and 62 deletions

View File

@ -396,12 +396,6 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
return Collections.emptyMap();
}
@Override
public boolean isK8sTaskRunner()
{
return true;
}
@Override
public void unregisterListener(String listenerId)
{
@ -457,4 +451,16 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
return workItem.getRunnerTaskState();
}
@Override
public int getTotalCapacity()
{
return config.getCapacity();
}
@Override
public int getUsedCapacity()
{
return tasks.size();
}
}

View File

@ -628,4 +628,22 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
verifyAll();
}
@Test
public void test_getTotalCapacity()
{
Assert.assertEquals(1, runner.getTotalCapacity());
}
@Test
public void test_getUsedCapacity()
{
Assert.assertEquals(0, runner.getUsedCapacity());
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
runner.tasks.put(task.getId(), workItem);
Assert.assertEquals(1, runner.getUsedCapacity());
runner.tasks.remove(task.getId());
Assert.assertEquals(0, runner.getUsedCapacity());
}
}

View File

@ -1641,4 +1641,16 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return totalBlacklistedPeons;
}
@Override
public int getTotalCapacity()
{
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}
@Override
public int getUsedCapacity()
{
return getWorkers().stream().mapToInt(ImmutableWorkerInfo::getCurrCapacityUsed).sum();
}
}

View File

@ -136,14 +136,6 @@ public interface TaskRunner
Map<String, Long> getBlacklistedTaskSlotCount();
/**
* Beacause the k8s task runner is an extension, we need to know the task runner type in the overlord resource
*/
default boolean isK8sTaskRunner()
{
return false;
}
default void updateStatus(Task task, TaskStatus status)
{
// do nothing
@ -154,5 +146,22 @@ public interface TaskRunner
// do nothing
}
/**
* The maximum number of tasks this TaskRunner can run concurrently.
* Can return -1 if this method is not implemented or capacity can't be found.
*/
default int getTotalCapacity()
{
return -1;
}
/**
* The current number of tasks this TaskRunner is running.
* Can return -1 if this method is not implemented or the # of tasks can't be found.
*/
default int getUsedCapacity()
{
return -1;
}
}

View File

@ -1779,6 +1779,18 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return totalBlacklistedPeons;
}
@Override
public int getTotalCapacity()
{
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}
@Override
public int getUsedCapacity()
{
return getWorkers().stream().mapToInt(ImmutableWorkerInfo::getCurrCapacityUsed).sum();
}
private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem
{
enum State

View File

@ -32,8 +32,6 @@ import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.exception.DruidException;
@ -469,27 +467,17 @@ public class OverlordResource
public Response getTotalWorkerCapacity()
{
// Calculate current cluster capacity
int currentCapacity;
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
// Cannot serve call as not leader
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
TaskRunner taskRunner = taskRunnerOptional.get();
Collection<ImmutableWorkerInfo> workers;
if (taskRunner instanceof WorkerTaskRunner) {
workers = ((WorkerTaskRunner) taskRunner).getWorkers();
currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
} else {
log.debug(
"Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers",
taskRunner,
taskRunner.getClass().getName()
);
workers = ImmutableList.of();
currentCapacity = -1;
}
Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ?
((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of();
int currentCapacity = taskRunner.getTotalCapacity();
int usedCapacity = taskRunner.getUsedCapacity();
// Calculate maximum capacity with auto scale
int maximumCapacity;
if (workerConfigRef == null) {
@ -520,7 +508,7 @@ public class OverlordResource
);
maximumCapacity = -1;
}
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build();
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build();
}
// default value is used for backwards compatibility
@ -939,24 +927,6 @@ public class OverlordResource
{
if (taskRunner instanceof WorkerTaskRunner) {
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
} else if (taskRunner.isK8sTaskRunner()) {
// required because kubernetes task runner has no concept of a worker, so returning a dummy worker.
return Response.ok(ImmutableList.of(
new IndexingWorkerInfo(
new IndexingWorker(
"http",
"host",
"8100",
taskRunner.getTotalTaskSlotCount().getOrDefault("taskQueue", 0L).intValue(),
"version"
),
0,
Collections.emptySet(),
Collections.emptyList(),
DateTimes.EPOCH,
null
)
)).build();
} else {
log.debug(
"Task runner [%s] of type [%s] does not support listing workers",

View File

@ -37,15 +37,22 @@ public class TotalWorkerCapacityResponse
* This can be -1 if it cannot be determined or if auto scaling is not configured.
*/
private final int maximumCapacityWithAutoScale;
/**
* Used cluster capacity of the current state of the cluster. This can be -1 if
* it cannot be determined.
*/
private final int usedClusterCapacity;
@JsonCreator
public TotalWorkerCapacityResponse(
@JsonProperty("currentClusterCapacity") int currentClusterCapacity,
@JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale
@JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale,
@JsonProperty("usedClusterCapacity") int usedClusterCapacity
)
{
this.currentClusterCapacity = currentClusterCapacity;
this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
this.usedClusterCapacity = usedClusterCapacity;
}
@JsonProperty
@ -59,4 +66,10 @@ public class TotalWorkerCapacityResponse
{
return maximumCapacityWithAutoScale;
}
@JsonProperty
public int getUsedClusterCapacity()
{
return usedClusterCapacity;
}
}

View File

@ -147,6 +147,9 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
@ -164,6 +167,8 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());
}
@Test

View File

@ -146,6 +146,8 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
Assert.assertEquals(4, taskRunner.getTotalCapacity());
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}
/*

View File

@ -1331,6 +1331,8 @@ public class OverlordResourceTest
WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
@ -1344,6 +1346,7 @@ public class OverlordResourceTest
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@ -1352,6 +1355,8 @@ public class OverlordResourceTest
{
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(null);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
@ -1365,6 +1370,7 @@ public class OverlordResourceTest
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@ -1374,6 +1380,8 @@ public class OverlordResourceTest
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
@ -1387,6 +1395,7 @@ public class OverlordResourceTest
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@ -1408,6 +1417,9 @@ public class OverlordResourceTest
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
@ -1435,6 +1447,7 @@ public class OverlordResourceTest
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(0, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}
@ -1442,20 +1455,24 @@ public class OverlordResourceTest
public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
{
int invalidExpectedCapacity = -1;
int currentTotalCapacity = 3;
int currentCapacityUsed = 2;
int maxNumWorkers = 2;
WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class);
Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 3, "v1", WorkerConfig.DEFAULT_CATEGORY
"http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
currentCapacityUsed,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
@ -1484,6 +1501,8 @@ public class OverlordResourceTest
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
}
@Test

View File

@ -16,24 +16,17 @@
* limitations under the License.
*/
import { sum } from 'd3-array';
import type { CapacityInfo } from '../druid-models';
import { Api } from '../singletons';
export async function getClusterCapacity(): Promise<CapacityInfo> {
const workersResponse = await Api.instance.get('/druid/indexer/v1/workers', {
const workersResponse = await Api.instance.get('/druid/indexer/v1/totalWorkerCapacity', {
timeout: 5000,
});
const usedTaskSlots = sum(
workersResponse.data,
(workerInfo: any) => Number(workerInfo.currCapacityUsed) || 0,
);
const usedTaskSlots = Number(workersResponse.data.usedClusterCapacity);
const totalTaskSlots = sum(workersResponse.data, (workerInfo: any) =>
Number(workerInfo.worker.capacity),
);
const totalTaskSlots = Number(workersResponse.data.currentClusterCapacity);
return {
availableTaskSlots: totalTaskSlots - usedTaskSlots,