diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 01a56cac2d2..0c7a492d807 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -294,6 +294,9 @@ If the JVM does not support CPU time measurement for the current thread, `ingest |`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for Middle Manager nodes.| `category`, `workerVersion`|Varies| |`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| |`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.| `category`, `workerVersion`|Varies| +|`worker/task/assigned/count`|Number of tasks assigned to an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/completed/count`|Number of tasks completed by an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| +|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index c3ad6e11efd..ad065c63d39 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -71,6 +71,9 @@ "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, + "worker/task/assigned/count" : { "dimensions" : ["dataSource"], "type" : "count" }, + "worker/task/running/count" : { "dimensions" : ["dataSource"], "type" : "count" }, + "worker/task/completed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "worker/task/failed/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/task/success/count" : { "dimensions" : ["category", "workerVersion"], "type" : "count" }, "worker/taskSlot/idle/count" : { "dimensions" : ["category", "workerVersion"], "type" : "gauge" }, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 86bb642ac2f..729ac1d1617 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -51,12 +51,16 @@ import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; +import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider; +import org.apache.druid.utils.CollectionUtils; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -67,6 +71,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -76,7 +81,7 @@ import java.util.stream.Collectors; * starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for * active tasks to see which completed tasks are safe to delete. */ -public class WorkerTaskManager +public class WorkerTaskManager implements IndexerTaskCountStatsProvider { private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class); @@ -607,6 +612,34 @@ public class WorkerTaskManager completedTasks.put(taskId, taskAnnouncement); } + private Map getNumTasksPerDatasource(Collection taskList, Function getDataSourceFunc) + { + final Map dataSourceToTaskCount = new HashMap<>(); + + for (T task : taskList) { + dataSourceToTaskCount.merge(getDataSourceFunc.apply(task), 1L, Long::sum); + } + return dataSourceToTaskCount; + } + + @Override + public Map getWorkerRunningTasks() + { + return getNumTasksPerDatasource(CollectionUtils.mapValues(runningTasks, detail -> detail.task).values(), Task::getDataSource); + } + + @Override + public Map getWorkerAssignedTasks() + { + return getNumTasksPerDatasource(assignedTasks.values(), Task::getDataSource); + } + + @Override + public Map getWorkerCompletedTasks() + { + return getNumTasksPerDatasource(this.getCompletedTasks().values(), TaskAnnouncement::getTaskDataSource); + } + private static class TaskDetails { private final Task task; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 1ccc956ca03..1aeb67d5a40 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -450,6 +450,11 @@ public class WorkerTaskManagerTest return new NoopTask(id, null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); } + private NoopTask createNoopTask(String id, String dataSource) + { + return new NoopTask(id, null, dataSource, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0)); + } + /** * Start the {@link #workerTaskManager}, submit a {@link NoopTask}, wait for it to be complete. Common preamble * for various tests of {@link WorkerTaskManager#doCompletedTasksCleanup()}. @@ -480,4 +485,43 @@ public class WorkerTaskManagerTest EasyMock.reset(overlordClient); return task; } + + @Test + public void getWorkerTaskStatsTest() throws Exception + { + EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes(); + EasyMock.replay(overlordClient); + + Task task1 = createNoopTask("task1", "wikipedia"); + Task task2 = createNoopTask("task2", "wikipedia"); + Task task3 = createNoopTask("task3", "animals"); + + workerTaskManager.start(); + // befor assigning tasks we should get no running tasks + Assert.assertEquals(workerTaskManager.getWorkerRunningTasks().size(), 0L); + + workerTaskManager.assignTask(task1); + workerTaskManager.assignTask(task2); + workerTaskManager.assignTask(task3); + + Thread.sleep(25); + //should return all 3 tasks as running + Assert.assertEquals(workerTaskManager.getWorkerRunningTasks(), ImmutableMap.of( + "wikipedia", 2L, + "animals", 1L + )); + + Map runningTasks; + do { + runningTasks = workerTaskManager.getWorkerRunningTasks(); + Thread.sleep(10); + } while (!runningTasks.isEmpty()); + + // When running tasks are empty all task should be reported as completed + Assert.assertEquals(workerTaskManager.getWorkerCompletedTasks(), ImmutableMap.of( + "wikipedia", 2L, + "animals", 1L + )); + Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L); + } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java new file mode 100644 index 00000000000..735bc27abb3 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/IndexerTaskCountStatsProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import java.util.Map; + +/** + * Provides task count metrics for the indexers + * These metrics are reported by indexers + */ +public interface IndexerTaskCountStatsProvider +{ + /** + * Map from datasource name to the number of running tasks on the Indexer. + */ + Map getWorkerRunningTasks(); + + /** + * Map from datasource name to the number of assigned tasks to the Indexer. + */ + Map getWorkerAssignedTasks(); + + /** + * Map from datasource name to the number of completed tasks by the Indexer. + */ + Map getWorkerCompletedTasks(); +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java index d6b322c2f0e..d07311c1a46 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java @@ -28,11 +28,13 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.query.DruidMetrics; +import java.util.Map; import java.util.Set; public class WorkerTaskCountStatsMonitor extends AbstractMonitor { private final WorkerTaskCountStatsProvider statsProvider; + private final IndexerTaskCountStatsProvider indexerStatsProvider; private final String workerCategory; private final String workerVersion; private final boolean isMiddleManager; @@ -46,9 +48,11 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER); if (isMiddleManager) { this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class); + this.indexerStatsProvider = null; this.workerCategory = statsProvider.getWorkerCategory(); this.workerVersion = statsProvider.getWorkerVersion(); } else { + this.indexerStatsProvider = injector.getInstance(IndexerTaskCountStatsProvider.class); this.statsProvider = null; this.workerCategory = null; this.workerVersion = null; @@ -64,6 +68,10 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount()); emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount()); emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount()); + } else { + emit(emitter, "worker/task/running/count", indexerStatsProvider.getWorkerRunningTasks()); + emit(emitter, "worker/task/assigned/count", indexerStatsProvider.getWorkerAssignedTasks()); + emit(emitter, "worker/task/completed/count", indexerStatsProvider.getWorkerCompletedTasks()); } return true; } @@ -77,4 +85,15 @@ public class WorkerTaskCountStatsMonitor extends AbstractMonitor emitter.emit(builder.setMetric(metricName, value)); } } + + public void emit(ServiceEmitter emitter, String metricName, Map dataSourceTaskMap) + { + for (Map.Entry dataSourceTaskCount : dataSourceTaskMap.entrySet()) { + if (dataSourceTaskCount.getValue() != null) { + ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + builder.setDimension(DruidMetrics.DATASOURCE, dataSourceTaskCount.getKey()); + emitter.emit(builder.setMetric(metricName, dataSourceTaskCount.getValue())); + } + } + } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java index fadb2f98826..ff9fcffb8d9 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java @@ -31,14 +31,17 @@ import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; +import java.util.Map; public class WorkerTaskCountStatsMonitorTest { private Injector injectorForMiddleManager; private Injector injectorForMiddleManagerNullStats; private Injector injectorForPeon; + private Injector injectorForIndexer; private WorkerTaskCountStatsProvider statsProvider; + private IndexerTaskCountStatsProvider indexerTaskStatsProvider; private WorkerTaskCountStatsProvider nullStatsProvider; @Before @@ -89,6 +92,36 @@ public class WorkerTaskCountStatsMonitorTest } }; + indexerTaskStatsProvider = new IndexerTaskCountStatsProvider() + { + @Override + public Map getWorkerRunningTasks() + { + return ImmutableMap.of( + "wikipedia", 2L, + "animals", 3L + ); + } + + @Override + public Map getWorkerAssignedTasks() + { + return ImmutableMap.of( + "products", 3L, + "orders", 7L + ); + } + + @Override + public Map getWorkerCompletedTasks() + { + return ImmutableMap.of( + "inventory", 8L, + "metrics", 9L + ); + } + }; + nullStatsProvider = new WorkerTaskCountStatsProvider() { @Nullable @@ -156,6 +189,12 @@ public class WorkerTaskCountStatsMonitorTest injectorForPeon = Guice.createInjector( ImmutableList.of(binder -> {}) ); + + injectorForIndexer = Guice.createInjector( + ImmutableList.of( + binder -> binder.bind(IndexerTaskCountStatsProvider.class).toInstance(indexerTaskStatsProvider) + ) + ); } @Test @@ -193,6 +232,45 @@ public class WorkerTaskCountStatsMonitorTest ); } + @Test + public void testMonitorIndexer() + { + final WorkerTaskCountStatsMonitor monitor = + new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER)); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + Assert.assertEquals(6, emitter.getEvents().size()); + emitter.verifyValue( + "worker/task/running/count", + ImmutableMap.of("dataSource", "wikipedia"), + 2L + ); + emitter.verifyValue( + "worker/task/running/count", + ImmutableMap.of("dataSource", "animals"), + 3L + ); + emitter.verifyValue( + "worker/task/assigned/count", + ImmutableMap.of("dataSource", "products"), + 3L + ); + emitter.verifyValue( + "worker/task/assigned/count", + ImmutableMap.of("dataSource", "orders"), + 7L + ); + emitter.verifyValue( + "worker/task/completed/count", + ImmutableMap.of("dataSource", "inventory"), + 8L + ); + emitter.verifyValue( + "worker/task/completed/count", + ImmutableMap.of("dataSource", "metrics"), + 9L + ); + } @Test public void testMonitorWithNulls() { @@ -202,14 +280,4 @@ public class WorkerTaskCountStatsMonitorTest monitor.doMonitor(emitter); Assert.assertEquals(0, emitter.getEvents().size()); } - - @Test - public void testMonitorNotMiddleManager() - { - final WorkerTaskCountStatsMonitor monitor = - new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON)); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); - monitor.doMonitor(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); - } } diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index aea0922efb2..7b90b2eddcd 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.common.TaskReportFileWriter; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.ThreadingTaskRunner; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.shuffle.ShuffleModule; import org.apache.druid.java.util.common.logger.Logger; @@ -79,6 +80,7 @@ import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.metrics.IndexerTaskCountStatsProvider; import org.eclipse.jetty.server.Server; import java.util.List; @@ -152,6 +154,7 @@ public class CliIndexer extends ServerRunnable binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class); binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class); binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class); + binder.bind(IndexerTaskCountStatsProvider.class).to(WorkerTaskManager.class); CliPeon.bindRowIngestionMeters(binder); CliPeon.bindChatHandler(binder);