Add indexer level task metrics to provide more visibility in the task distribution (#15991)

Changes:

Add the following indexer level task metrics:
- `worker/task/running/count`
- `worker/task/assigned/count`
- `worker/task/completed/count`

These metrics will provide more visibility into the tasks distribution across indexers
(We often see a task skew issue across indexers and with this issue it would be easier
to catch the imbalance)
This commit is contained in:
Rushikesh Bankar 2024-03-21 11:08:01 +05:30 committed by GitHub
parent e769ec7b96
commit 3d8b0ffae8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 228 additions and 11 deletions

View File

@ -294,6 +294,9 @@ If the JVM does not support CPU time measurement for the current thread, `ingest
|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for Middle Manager nodes.| `category`, `workerVersion`|Varies| |`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for Middle Manager nodes.| `category`, `workerVersion`|Varies|
|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies|
|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
## Shuffle metrics (Native parallel task) ## Shuffle metrics (Native parallel task)

View File

@ -71,6 +71,9 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
"worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, "worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" },

View File

@ -51,12 +51,16 @@ import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot; import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider;
import org.apache.druid.utils.CollectionUtils;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -67,6 +71,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -76,7 +81,7 @@ import java.util.stream.Collectors;
* starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for * starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for
* active tasks to see which completed tasks are safe to delete. * active tasks to see which completed tasks are safe to delete.
*/ */
public class WorkerTaskManager public class WorkerTaskManager implements IndexerTaskCountStatsProvider
{ {
private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class); private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);
@ -607,6 +612,34 @@ public class WorkerTaskManager
completedTasks.put(taskId, taskAnnouncement); completedTasks.put(taskId, taskAnnouncement);
} }
private <T> Map<String, Long> getNumTasksPerDatasource(Collection<T> taskList, Function<T, String> getDataSourceFunc)
{
final Map<String, Long> dataSourceToTaskCount = new HashMap<>();
for (T task : taskList) {
dataSourceToTaskCount.merge(getDataSourceFunc.apply(task), 1L, Long::sum);
}
return dataSourceToTaskCount;
}
@Override
public Map<String, Long> getWorkerRunningTasks()
{
return getNumTasksPerDatasource(CollectionUtils.mapValues(runningTasks, detail -> detail.task).values(), Task::getDataSource);
}
@Override
public Map<String, Long> getWorkerAssignedTasks()
{
return getNumTasksPerDatasource(assignedTasks.values(), Task::getDataSource);
}
@Override
public Map<String, Long> getWorkerCompletedTasks()
{
return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource);
}
private static class TaskDetails private static class TaskDetails
{ {
private final Task task; private final Task task;

View File

@ -450,6 +450,11 @@ public class WorkerTaskManagerTest
return new NoopTask(id, null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); return new NoopTask(id, null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
} }
private NoopTask createNoopTask(String id, String dataSource)
{
return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
}
/** /**
* Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble
* for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}. * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}.
@ -480,4 +485,43 @@ public class WorkerTaskManagerTest
EasyMock.reset(overlordClient); EasyMock.reset(overlordClient);
return task; return task;
} }
@Test
public void getWorkerTaskStatsTest() throws Exception
{
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);
Task task1 = createNoopTask("task1", "wikipedia");
Task task2 = createNoopTask("task2", "wikipedia");
Task task3 = createNoopTask("task3", "animals");
workerTaskManager.start();
// befor assigning tasks we should get no running tasks
Assert.assertEquals(workerTaskManager.getWorkerRunningTasks().size(), 0L);
workerTaskManager.assignTask(task1);
workerTaskManager.assignTask(task2);
workerTaskManager.assignTask(task3);
Thread.sleep(25);
//should return all 3 tasks as running
Assert.assertEquals(workerTaskManager.getWorkerRunningTasks(), ImmutableMap.of(
"wikipedia", 2L,
"animals", 1L
));
Map<String, Long> runningTasks;
do {
runningTasks = workerTaskManager.getWorkerRunningTasks();
Thread.sleep(10);
} while (!runningTasks.isEmpty());
// When running tasks are empty all task should be reported as completed
Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of(
"wikipedia", 2L,
"animals", 1L
));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
}
} }

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import java.util.Map;
/**
* Provides task count metrics for the indexers
* These metrics are reported by indexers
*/
public interface IndexerTaskCountStatsProvider
{
/**
* Map from datasource name to the number of running tasks on the Indexer.
*/
Map<String, Long> getWorkerRunningTasks();
/**
* Map from datasource name to the number of assigned tasks to the Indexer.
*/
Map<String, Long> getWorkerAssignedTasks();
/**
* Map from datasource name to the number of completed tasks by the Indexer.
*/
Map<String, Long> getWorkerCompletedTasks();
}

View File

@ -28,11 +28,13 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.DruidMetrics;
import java.util.Map;
import java.util.Set; import java.util.Set;
public class WorkerTaskCountStatsMonitor extends AbstractMonitor public class WorkerTaskCountStatsMonitor extends AbstractMonitor
{ {
private final WorkerTaskCountStatsProvider statsProvider; private final WorkerTaskCountStatsProvider statsProvider;
private final IndexerTaskCountStatsProvider indexerStatsProvider;
private final String workerCategory; private final String workerCategory;
private final String workerVersion; private final String workerVersion;
private final boolean isMiddleManager; private final boolean isMiddleManager;
@ -46,9 +48,11 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor
this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER); this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER);
if (isMiddleManager) { if (isMiddleManager) {
this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class); this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class);
this.indexerStatsProvider = null;
this.workerCategory = statsProvider.getWorkerCategory(); this.workerCategory = statsProvider.getWorkerCategory();
this.workerVersion = statsProvider.getWorkerVersion(); this.workerVersion = statsProvider.getWorkerVersion();
} else { } else {
this.indexerStatsProvider = injector.getInstance(IndexerTaskCountStatsProvider.class);
this.statsProvider = null; this.statsProvider = null;
this.workerCategory = null; this.workerCategory = null;
this.workerVersion = null; this.workerVersion = null;
@ -64,6 +68,10 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor
emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount()); emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount());
emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount()); emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount());
emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount()); emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount());
} else {
emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks());
emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks());
emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks());
} }
return true; return true;
} }
@ -77,4 +85,15 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor
emitter.emit(builder.setMetric(metricName, value)); emitter.emit(builder.setMetric(metricName, value));
} }
} }
public void emit(ServiceEmitter emitter, String metricName, Map<String, Long> dataSourceTaskMap)
{
for (Map.Entry<String, Long> dataSourceTaskCount : dataSourceTaskMap.entrySet()) {
if (dataSourceTaskCount.getValue() != null) {
ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
builder.setDimension(DruidMetrics.DATASOURCE, dataSourceTaskCount.getKey());
emitter.emit(builder.setMetric(metricName, dataSourceTaskCount.getValue()));
}
}
}
} }

View File

@ -31,14 +31,17 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Map;
public class WorkerTaskCountStatsMonitorTest public class WorkerTaskCountStatsMonitorTest
{ {
private Injector injectorForMiddleManager; private Injector injectorForMiddleManager;
private Injector injectorForMiddleManagerNullStats; private Injector injectorForMiddleManagerNullStats;
private Injector injectorForPeon; private Injector injectorForPeon;
private Injector injectorForIndexer;
private WorkerTaskCountStatsProvider statsProvider; private WorkerTaskCountStatsProvider statsProvider;
private IndexerTaskCountStatsProvider indexerTaskStatsProvider;
private WorkerTaskCountStatsProvider nullStatsProvider; private WorkerTaskCountStatsProvider nullStatsProvider;
@Before @Before
@ -89,6 +92,36 @@ public class WorkerTaskCountStatsMonitorTest
} }
}; };
indexerTaskStatsProvider = new IndexerTaskCountStatsProvider()
{
@Override
public Map<String, Long> getWorkerRunningTasks()
{
return ImmutableMap.of(
"wikipedia", 2L,
"animals", 3L
);
}
@Override
public Map<String, Long> getWorkerAssignedTasks()
{
return ImmutableMap.of(
"products", 3L,
"orders", 7L
);
}
@Override
public Map<String, Long> getWorkerCompletedTasks()
{
return ImmutableMap.of(
"inventory", 8L,
"metrics", 9L
);
}
};
nullStatsProvider = new WorkerTaskCountStatsProvider() nullStatsProvider = new WorkerTaskCountStatsProvider()
{ {
@Nullable @Nullable
@ -156,6 +189,12 @@ public class WorkerTaskCountStatsMonitorTest
injectorForPeon = Guice.createInjector( injectorForPeon = Guice.createInjector(
ImmutableList.of(binder -> {}) ImmutableList.of(binder -> {})
); );
injectorForIndexer = Guice.createInjector(
ImmutableList.of(
binder -> binder.bind(IndexerTaskCountStatsProvider.class).toInstance(indexerTaskStatsProvider)
)
);
} }
@Test @Test
@ -193,6 +232,45 @@ public class WorkerTaskCountStatsMonitorTest
); );
} }
@Test
public void testMonitorIndexer()
{
final WorkerTaskCountStatsMonitor monitor =
new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(6, emitter.getEvents().size());
emitter.verifyValue(
"worker/task/running/count",
ImmutableMap.of("dataSource", "wikipedia"),
2L
);
emitter.verifyValue(
"worker/task/running/count",
ImmutableMap.of("dataSource", "animals"),
3L
);
emitter.verifyValue(
"worker/task/assigned/count",
ImmutableMap.of("dataSource", "products"),
3L
);
emitter.verifyValue(
"worker/task/assigned/count",
ImmutableMap.of("dataSource", "orders"),
7L
);
emitter.verifyValue(
"worker/task/completed/count",
ImmutableMap.of("dataSource", "inventory"),
8L
);
emitter.verifyValue(
"worker/task/completed/count",
ImmutableMap.of("dataSource", "metrics"),
9L
);
}
@Test @Test
public void testMonitorWithNulls() public void testMonitorWithNulls()
{ {
@ -202,14 +280,4 @@ public class WorkerTaskCountStatsMonitorTest
monitor.doMonitor(emitter); monitor.doMonitor(emitter);
Assert.assertEquals(0, emitter.getEvents().size()); Assert.assertEquals(0, emitter.getEvents().size());
} }
@Test
public void testMonitorNotMiddleManager()
{
final WorkerTaskCountStatsMonitor monitor =
new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
}
} }

View File

@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.ThreadingTaskRunner; import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.WorkerTaskManager;
import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.ShuffleModule; import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
@ -79,6 +80,7 @@ import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.util.List; import java.util.List;
@ -152,6 +154,7 @@ public class CliIndexer extends ServerRunnable
binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class); binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class); binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class);
binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class); binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class);
binder.bind(IndexerTaskCountStatsProvider.class).to(WorkerTaskManager.class);
CliPeon.bindRowIngestionMeters(binder); CliPeon.bindRowIngestionMeters(binder);
CliPeon.bindChatHandler(binder); CliPeon.bindChatHandler(binder);