Add TaskCountStatsMonitor to monitor task count stats (#6657)

* Add TaskCountStatsMonitor to monitor task count stats

* address comments

* add file header

* tweak test
This commit is contained in:
Mingming Qiu 2018-12-05 05:37:17 +08:00 committed by Gian Merlino
parent ec36f0b82f
commit 607339003b
8 changed files with 350 additions and 2 deletions

View File

@ -25,7 +25,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.ArrayList;
import java.util.List;
class StubServiceEmitter extends ServiceEmitter
public class StubServiceEmitter extends ServiceEmitter
{
private List<Event> events = new ArrayList<>();

View File

@ -165,6 +165,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskId, taskType, interval.|Varies.|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskId, taskType, interval.|Varies.|
|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
## Coordination

View File

@ -41,14 +41,16 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.metrics.TaskCountStatsProvider;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* Encapsulates the indexer leadership lifecycle.
*/
public class TaskMaster
public class TaskMaster implements TaskCountStatsProvider
{
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
@ -265,4 +267,59 @@ public class TaskMaster
return Optional.absent();
}
}
@Override
public Map<String, Long> getSuccessfulTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getSuccessfulTaskCount();
} else {
return null;
}
}
@Override
public Map<String, Long> getFailedTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getFailedTaskCount();
} else {
return null;
}
}
@Override
public Map<String, Long> getRunningTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getRunningTaskCount();
} else {
return null;
}
}
@Override
public Map<String, Long> getPendingTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getPendingTaskCount();
} else {
return null;
}
}
@Override
public Map<String, Long> getWaitingTaskCount()
{
Optional<TaskQueue> taskQueue = getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get().getWaitingTaskCount();
} else {
return null;
}
}
}

View File

@ -53,12 +53,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* Interface between task producers and the task runner.
@ -100,6 +103,11 @@ public class TaskQueue
private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
private final Map<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>();
private final Map<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>();
private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
@Inject
public TaskQueue(
TaskQueueConfig config,
@ -510,6 +518,14 @@ public class TaskQueue
task,
status.getDuration()
);
if (status.isSuccess()) {
totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
.incrementAndGet();
} else {
totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
.incrementAndGet();
}
}
}
catch (Exception e) {
@ -586,4 +602,70 @@ public class TaskQueue
return rv;
}
private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
{
return total.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
}
public Map<String, Long> getSuccessfulTaskCount()
{
Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().get()
));
Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
prevTotalSuccessfulTaskCount = total;
return delta;
}
public Map<String, Long> getFailedTaskCount()
{
Map<String, Long> total = totalFailedTaskCount.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().get()
));
Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
prevTotalFailedTaskCount = total;
return delta;
}
public Map<String, Long> getRunningTaskCount()
{
Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
return taskRunner.getRunningTasks()
.stream()
.collect(Collectors.toMap(
e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
e -> 1L,
Long::sum
));
}
public Map<String, Long> getPendingTaskCount()
{
Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
return taskRunner.getPendingTasks()
.stream()
.collect(Collectors.toMap(
e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
e -> 1L,
Long::sum
));
}
public Map<String, Long> getWaitingTaskCount()
{
Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.collect(Collectors.toSet());
return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
.collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.inject.Inject;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import java.util.Map;
public class TaskCountStatsMonitor extends AbstractMonitor
{
private final TaskCountStatsProvider statsProvider;
@Inject
public TaskCountStatsMonitor(
TaskCountStatsProvider statsProvider
)
{
this.statsProvider = statsProvider;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
emit(emitter, "task/success/count", statsProvider.getSuccessfulTaskCount());
emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount());
emit(emitter, "task/running/count", statsProvider.getRunningTaskCount());
emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount());
emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount());
return true;
}
private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts)
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
if (counts != null) {
counts.forEach((k, v) -> {
builder.setDimension("dataSource", k);
emitter.emit(builder.build(key, v));
});
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import java.util.Map;
public interface TaskCountStatsProvider
{
/**
* Return the number of successful tasks for each datasource during emission period.
*/
Map<String, Long> getSuccessfulTaskCount();
/**
* Return the number of failed tasks for each datasource during emission period.
*/
Map<String, Long> getFailedTaskCount();
/**
* Return the number of current running tasks for each datasource.
*/
Map<String, Long> getRunningTaskCount();
/**
* Return the number of current pending tasks for each datasource.
*/
Map<String, Long> getPendingTaskCount();
/**
* Return the number of current waiting tasks for each datasource.
*/
Map<String, Long> getWaitingTaskCount();
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class TaskCountStatsMonitorTest
{
private TaskCountStatsProvider statsProvider;
@Before
public void setUp()
{
statsProvider = new TaskCountStatsProvider()
{
@Override
public Map<String, Long> getSuccessfulTaskCount()
{
return ImmutableMap.of("d1", 1L);
}
@Override
public Map<String, Long> getFailedTaskCount()
{
return ImmutableMap.of("d1", 1L);
}
@Override
public Map<String, Long> getRunningTaskCount()
{
return ImmutableMap.of("d1", 1L);
}
@Override
public Map<String, Long> getPendingTaskCount()
{
return ImmutableMap.of("d1", 1L);
}
@Override
public Map<String, Long> getWaitingTaskCount()
{
return ImmutableMap.of("d1", 1L);
}
};
}
@Test
public void testMonitor()
{
final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
Assert.assertEquals(5, emitter.getEvents().size());
Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric"));
Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric"));
Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric"));
Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric"));
Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric"));
Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
}
}

View File

@ -100,6 +100,7 @@ import org.apache.druid.server.http.RedirectInfo;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.TaskCountStatsProvider;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationUtils;
import org.apache.druid.server.security.Authenticator;
@ -170,6 +171,7 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(