Worker level task metrics (#12446)

* * fix metric name inconsistency

* * add task slot metrics for middle managers

* * add new WorkerTaskCountStatsMonitor to report task count metrics
  from worker

* * more stuff

* * remove unused variable

* * more stuff

* * add javadocs

* * fix checkstyle

* * fix hadoop test failure

* * cleanup

* * add more code coverage in tests

* * fix test failure

* * add docs

* * increase code coverage

* * fix spelling

* * fix failing tests

* * remove dead code

* * fix spelling
This commit is contained in:
zachjsh 2022-04-26 12:44:44 -04:00 committed by GitHub
parent 4868ef9529
commit 564d6defd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 454 additions and 11 deletions

View File

@ -383,6 +383,8 @@ Metric monitoring is an essential part of Druid operations. The following monit
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|
|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.|
For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows:

View File

@ -214,6 +214,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
|`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
|`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
## Shuffle metrics (Native parallel task)

View File

@ -63,9 +63,15 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"worker/task/failed/count" : { "dimensions" : ["category", "version"], "type" : "count" },
"worker/task/success/count" : { "dimensions" : ["category", "version"], "type" : "count" },
"worker/taskSlot/idle/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
"worker/taskSlot/total/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
"worker/taskSlot/used/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
"taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },

View File

@ -63,6 +63,7 @@ import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
@ -83,13 +84,14 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Runs tasks in separate processes using the "internal peon" verb.
*/
public class ForkingTaskRunner
extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem>
implements TaskLogStreamer
implements TaskLogStreamer, WorkerTaskCountStatsProvider
{
private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
@ -104,6 +106,11 @@ public class ForkingTaskRunner
private volatile boolean stopping = false;
private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong();
private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong();
private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong();
@Inject
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
@ -399,7 +406,11 @@ public class ForkingTaskRunner
)
);
}
if (status.isSuccess()) {
SUCCESSFUL_TASK_COUNT.incrementAndGet();
} else {
FAILED_TASK_COUNT.incrementAndGet();
}
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
return status;
}
@ -690,18 +701,12 @@ public class ForkingTaskRunner
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
}
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
return ImmutableMap.of(workerConfig.getCategory(), getTotalTaskSlotCountLong());
}
public long getTotalTaskSlotCountLong()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return config.getPorts().size();
}
return config.getEndPort() - config.getStartPort() + 1;
return workerConfig.getCapacity();
}
@Override
@ -733,6 +738,54 @@ public class ForkingTaskRunner
return ImmutableMap.of(workerConfig.getCategory(), 0L);
}
@Override
public Long getWorkerFailedTaskCount()
{
long failedTaskCount = FAILED_TASK_COUNT.get();
long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get();
LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount);
return failedTaskCount - lastReportedFailedTaskCount;
}
@Override
public Long getWorkerIdleTaskSlotCount()
{
return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0);
}
@Override
public Long getWorkerUsedTaskSlotCount()
{
return (long) portFinder.findUsedPortCount();
}
@Override
public Long getWorkerTotalTaskSlotCount()
{
return getTotalTaskSlotCountLong();
}
@Override
public String getWorkerCategory()
{
return workerConfig.getCategory();
}
@Override
public String getWorkerVersion()
{
return workerConfig.getVersion();
}
@Override
public Long getWorkerSuccessfulTaskCount()
{
long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get();
long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get();
LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount);
return successfulTaskCount - lastReportedSuccessfulTaskCount;
}
protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;

View File

@ -231,6 +231,12 @@ public class ForkingTaskRunnerTest
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
WorkerConfig workerConfig = new WorkerConfig();
Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
// Emulate task process failure
return 1;
}
@ -242,6 +248,8 @@ public class ForkingTaskRunnerTest
"Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
status.getErrorMsg()
);
Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
}
@Test
@ -294,6 +302,12 @@ public class ForkingTaskRunnerTest
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
WorkerConfig workerConfig = new WorkerConfig();
Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
return 0;
}
};
@ -301,6 +315,8 @@ public class ForkingTaskRunnerTest
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
}
@Test

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import java.util.Set;
public class WorkerTaskCountStatsMonitor extends AbstractMonitor
{
private final WorkerTaskCountStatsProvider statsProvider;
private final String workerCategory;
private final String workerVersion;
private final boolean isMiddleManager;
@Inject
public WorkerTaskCountStatsMonitor(
Injector injector,
@Self Set<NodeRole> nodeRoles
)
{
this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER);
if (isMiddleManager) {
this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class);
this.workerCategory = statsProvider.getWorkerCategory();
this.workerVersion = statsProvider.getWorkerVersion();
} else {
this.statsProvider = null;
this.workerCategory = null;
this.workerVersion = null;
}
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
if (isMiddleManager) {
emit(emitter, "worker/task/failed/count", statsProvider.getWorkerFailedTaskCount());
emit(emitter, "worker/task/success/count", statsProvider.getWorkerSuccessfulTaskCount());
emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount());
emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount());
emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount());
}
return true;
}
private void emit(ServiceEmitter emitter, String metricName, Long value)
{
if (value != null) {
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
builder.setDimension("category", workerCategory);
builder.setDimension("version", workerVersion);
emitter.emit(builder.build(metricName, value));
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
/**
* Proides task / task count status at the level of individual worker nodes. These merics
* are repoerted by workers, like middle-managers.
*/
public interface WorkerTaskCountStatsProvider
{
/**
* The number of failed tasks run on worker during emission period.
*/
Long getWorkerFailedTaskCount();
/**
* The number of successful tasks run on worker during emission period.
*/
Long getWorkerSuccessfulTaskCount();
/**
* The number of idle task slots on worker.
*/
Long getWorkerIdleTaskSlotCount();
/**
* The number of total task slots on worker.
*/
Long getWorkerTotalTaskSlotCount();
/**
* The number of used task slots on worker.
*/
Long getWorkerUsedTaskSlotCount();
/**
* The worker category.
*/
String getWorkerCategory();
/**
* The worker version.
*/
String getWorkerVersion();
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
public class WorkerTaskCountStatsMonitorTest
{
private Injector injectorForMiddleManager;
private Injector injectorForMiddleManagerNullStats;
private Injector injectorForPeon;
private WorkerTaskCountStatsProvider statsProvider;
private WorkerTaskCountStatsProvider nullStatsProvider;
@Before
public void setUp()
{
statsProvider = new WorkerTaskCountStatsProvider()
{
@Override
public Long getWorkerTotalTaskSlotCount()
{
return 5L;
}
@Override
public Long getWorkerFailedTaskCount()
{
return 4L;
}
@Override
public Long getWorkerIdleTaskSlotCount()
{
return 3L;
}
@Override
public Long getWorkerSuccessfulTaskCount()
{
return 2L;
}
@Override
public Long getWorkerUsedTaskSlotCount()
{
return 1L;
}
@Override
public String getWorkerCategory()
{
return "workerCategory";
}
@Override
public String getWorkerVersion()
{
return "workerVersion";
}
};
nullStatsProvider = new WorkerTaskCountStatsProvider()
{
@Nullable
@Override
public Long getWorkerTotalTaskSlotCount()
{
return null;
}
@Nullable
@Override
public Long getWorkerFailedTaskCount()
{
return null;
}
@Nullable
@Override
public Long getWorkerIdleTaskSlotCount()
{
return null;
}
@Nullable
@Override
public Long getWorkerSuccessfulTaskCount()
{
return null;
}
@Nullable
@Override
public Long getWorkerUsedTaskSlotCount()
{
return null;
}
@Nullable
@Override
public String getWorkerCategory()
{
return null;
}
@Nullable
@Override
public String getWorkerVersion()
{
return null;
}
};
injectorForMiddleManager = Guice.createInjector(
ImmutableList.of(
(Module) binder -> {
binder.bind(WorkerTaskCountStatsProvider.class).toInstance(statsProvider);
}
)
);
injectorForMiddleManagerNullStats = Guice.createInjector(
ImmutableList.of(
(Module) binder -> {
binder.bind(WorkerTaskCountStatsProvider.class).toInstance(nullStatsProvider);
}
)
);
injectorForPeon = Guice.createInjector(
ImmutableList.of(
(Module) binder -> {}
)
);
}
@Test
public void testMonitor()
{
final WorkerTaskCountStatsMonitor monitor =
new WorkerTaskCountStatsMonitor(injectorForMiddleManager, ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(5, emitter.getEvents().size());
Assert.assertEquals("worker/task/failed/count", emitter.getEvents().get(0).toMap().get("metric"));
Assert.assertEquals("workerCategory", emitter.getEvents().get(0).toMap().get("category"));
Assert.assertEquals("workerVersion", emitter.getEvents().get(0).toMap().get("version"));
Assert.assertEquals(4L, emitter.getEvents().get(0).toMap().get("value"));
Assert.assertEquals("worker/task/success/count", emitter.getEvents().get(1).toMap().get("metric"));
Assert.assertEquals("workerCategory", emitter.getEvents().get(1).toMap().get("category"));
Assert.assertEquals("workerVersion", emitter.getEvents().get(1).toMap().get("version"));
Assert.assertEquals(2L, emitter.getEvents().get(1).toMap().get("value"));
Assert.assertEquals("worker/taskSlot/idle/count", emitter.getEvents().get(2).toMap().get("metric"));
Assert.assertEquals("workerCategory", emitter.getEvents().get(2).toMap().get("category"));
Assert.assertEquals("workerVersion", emitter.getEvents().get(2).toMap().get("version"));
Assert.assertEquals(3L, emitter.getEvents().get(2).toMap().get("value"));
Assert.assertEquals("worker/taskSlot/total/count", emitter.getEvents().get(3).toMap().get("metric"));
Assert.assertEquals("workerCategory", emitter.getEvents().get(3).toMap().get("category"));
Assert.assertEquals("workerVersion", emitter.getEvents().get(3).toMap().get("version"));
Assert.assertEquals(5L, emitter.getEvents().get(3).toMap().get("value"));
Assert.assertEquals("worker/taskSlot/used/count", emitter.getEvents().get(4).toMap().get("metric"));
Assert.assertEquals("workerCategory", emitter.getEvents().get(4).toMap().get("category"));
Assert.assertEquals("workerVersion", emitter.getEvents().get(4).toMap().get("version"));
Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
}
@Test
public void testMonitorWithNulls()
{
final WorkerTaskCountStatsMonitor monitor =
new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
}
@Test
public void testMonitorNotMiddleManager()
{
final WorkerTaskCountStatsMonitor monitor =
new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON));
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(0, emitter.getEvents().size());
}
}

View File

@ -78,6 +78,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;
@ -137,6 +138,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(WorkerTaskCountStatsProvider.class).to(ForkingTaskRunner.class);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>() {})

View File

@ -1428,6 +1428,7 @@ Sys
SysMonitor
TaskCountStatsMonitor
TaskSlotCountStatsMonitor
WorkerTaskCountStatsMonitor
bufferCapacity
bufferpoolName
cms