mirror of https://github.com/apache/druid.git
Adding task slot count metrics to Druid Overlord (#10379)
* Adding more worker metrics to Druid Overlord * Changing the nomenclature from worker to peon as that represents the metrics that we want to monitor better * Few more instance of worker usage replaced with peon * Modifying the peon idle count logic to only use eligible workers available capacity * Changing the naming to task slot count instead of peon * Adding some unit test coverage for the new test runner apis * Addressing Review Comments * Modifying the TaskSlotCountStatsProvider apis so that overlords which are not leader do not emit these metrics * Fixing the spelling issue in the docs * Setting the annotation Nullable on the TaskSlotCountStatsProvider methods
This commit is contained in:
parent
729bcba7ac
commit
8168e14e92
|
@ -196,6 +196,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|
||||||
|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|
|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|
||||||
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|
||||||
|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|
|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|
||||||
|
|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|
||||||
|
|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|
||||||
|
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|
||||||
|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|
||||||
|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
|
||||||
|
|
||||||
## Coordination
|
## Coordination
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,12 @@
|
||||||
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||||
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count" },
|
||||||
|
|
||||||
|
"taskSlot/total/count" : { "dimensions" : [], "type" : "count" },
|
||||||
|
"taskSlot/idle/count" : { "dimensions" : [], "type" : "count" },
|
||||||
|
"taskSlot/busy/count" : { "dimensions" : [], "type" : "count" },
|
||||||
|
"taskSlot/lazy/count" : { "dimensions" : [], "type" : "count" },
|
||||||
|
"taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "count" },
|
||||||
|
|
||||||
"task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
|
"task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
|
||||||
"segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
|
"segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
|
||||||
"segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
|
"segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
|
||||||
|
|
|
@ -649,6 +649,39 @@ public class ForkingTaskRunner
|
||||||
return Joiner.on(" ").join(maskedIterator);
|
return Joiner.on(" ").join(maskedIterator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
|
||||||
|
return config.getPorts().size();
|
||||||
|
}
|
||||||
|
return config.getEndPort() - config.getStartPort() + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return portFinder.findUsedPortCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
|
protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
|
||||||
{
|
{
|
||||||
private final Task task;
|
private final Task task;
|
||||||
|
|
|
@ -79,6 +79,11 @@ public class PortFinder
|
||||||
usedPorts.remove(port);
|
usedPorts.remove(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized int findUsedPortCount()
|
||||||
|
{
|
||||||
|
return usedPorts.size();
|
||||||
|
}
|
||||||
|
|
||||||
private int chooseFromCandidates()
|
private int chooseFromCandidates()
|
||||||
{
|
{
|
||||||
for (int port : candidatePorts) {
|
for (int port : candidatePorts) {
|
||||||
|
|
|
@ -823,17 +823,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
synchronized (workersWithUnacknowledgedTask) {
|
synchronized (workersWithUnacknowledgedTask) {
|
||||||
immutableZkWorker = strategy.findWorkerForTask(
|
immutableZkWorker = strategy.findWorkerForTask(
|
||||||
config,
|
config,
|
||||||
ImmutableMap.copyOf(
|
ImmutableMap.copyOf(getWorkersEligibleToRunTasks()),
|
||||||
Maps.transformEntries(
|
|
||||||
Maps.filterEntries(
|
|
||||||
zkWorkers,
|
|
||||||
input -> !lazyWorkers.containsKey(input.getKey()) &&
|
|
||||||
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
|
|
||||||
!blackListedWorkers.contains(input.getValue())
|
|
||||||
),
|
|
||||||
(String key, ZkWorker value) -> value.toImmutable()
|
|
||||||
)
|
|
||||||
),
|
|
||||||
task
|
task
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -867,6 +857,19 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks()
|
||||||
|
{
|
||||||
|
return Maps.transformEntries(
|
||||||
|
Maps.filterEntries(
|
||||||
|
zkWorkers,
|
||||||
|
input -> !lazyWorkers.containsKey(input.getKey()) &&
|
||||||
|
!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
|
||||||
|
!blackListedWorkers.contains(input.getValue())
|
||||||
|
),
|
||||||
|
(String key, ZkWorker value) -> value.toImmutable()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
|
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
|
||||||
* removing the task ZK entry and creating a task status ZK entry.
|
* removing the task ZK entry and creating a task status ZK entry.
|
||||||
|
@ -1434,4 +1437,59 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
{
|
{
|
||||||
return workersWithUnacknowledgedTask;
|
return workersWithUnacknowledgedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalPeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getWorkers()) {
|
||||||
|
totalPeons += worker.getWorker().getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalPeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalIdlePeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
|
||||||
|
totalIdlePeons += worker.getAvailableCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalIdlePeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalUsedPeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getWorkers()) {
|
||||||
|
totalUsedPeons += worker.getCurrCapacityUsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalUsedPeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalLazyPeons = 0;
|
||||||
|
for (Worker worker : getLazyWorkers()) {
|
||||||
|
totalLazyPeons += worker.getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalLazyPeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalBlacklistedPeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
|
||||||
|
totalBlacklistedPeons += worker.getWorker().getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalBlacklistedPeons;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,6 +313,36 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
return runningItem == null ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return runningItem == null ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
|
||||||
{
|
{
|
||||||
|
|
|
@ -42,7 +42,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
import org.apache.druid.server.DruidNode;
|
import org.apache.druid.server.DruidNode;
|
||||||
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
|
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
|
||||||
import org.apache.druid.server.metrics.TaskCountStatsProvider;
|
import org.apache.druid.server.metrics.TaskCountStatsProvider;
|
||||||
|
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
@ -50,7 +52,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
/**
|
/**
|
||||||
* Encapsulates the indexer leadership lifecycle.
|
* Encapsulates the indexer leadership lifecycle.
|
||||||
*/
|
*/
|
||||||
public class TaskMaster implements TaskCountStatsProvider
|
public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
|
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
|
||||||
|
|
||||||
|
@ -338,4 +340,64 @@ public class TaskMaster implements TaskCountStatsProvider
|
||||||
// fail silently since we are stopping anyway
|
// fail silently since we are stopping anyway
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
Optional<TaskRunner> taskRunner = getTaskRunner();
|
||||||
|
if (taskRunner.isPresent()) {
|
||||||
|
return taskRunner.get().getTotalTaskSlotCount();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
Optional<TaskRunner> taskRunner = getTaskRunner();
|
||||||
|
if (taskRunner.isPresent()) {
|
||||||
|
return taskRunner.get().getIdleTaskSlotCount();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
Optional<TaskRunner> taskRunner = getTaskRunner();
|
||||||
|
if (taskRunner.isPresent()) {
|
||||||
|
return taskRunner.get().getUsedTaskSlotCount();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
Optional<TaskRunner> taskRunner = getTaskRunner();
|
||||||
|
if (taskRunner.isPresent()) {
|
||||||
|
return taskRunner.get().getLazyTaskSlotCount();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Nullable
|
||||||
|
public Long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
Optional<TaskRunner> taskRunner = getTaskRunner();
|
||||||
|
if (taskRunner.isPresent()) {
|
||||||
|
return taskRunner.get().getBlacklistedTaskSlotCount();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,4 +121,17 @@ public interface TaskRunner
|
||||||
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
|
* @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
|
||||||
*/
|
*/
|
||||||
Optional<ScalingStats> getScalingStats();
|
Optional<ScalingStats> getScalingStats();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* APIs useful for emitting statistics for @TaskSlotCountStatsMonitor
|
||||||
|
*/
|
||||||
|
long getTotalTaskSlotCount();
|
||||||
|
|
||||||
|
long getIdleTaskSlotCount();
|
||||||
|
|
||||||
|
long getUsedTaskSlotCount();
|
||||||
|
|
||||||
|
long getLazyTaskSlotCount();
|
||||||
|
|
||||||
|
long getBlacklistedTaskSlotCount();
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,6 +95,7 @@ public class ThreadingTaskRunner
|
||||||
private final MultipleFileTaskReportFileWriter taskReportFileWriter;
|
private final MultipleFileTaskReportFileWriter taskReportFileWriter;
|
||||||
private final ListeningExecutorService taskExecutor;
|
private final ListeningExecutorService taskExecutor;
|
||||||
private final ListeningExecutorService controlThreadExecutor;
|
private final ListeningExecutorService controlThreadExecutor;
|
||||||
|
private final WorkerConfig workerConfig;
|
||||||
|
|
||||||
private volatile boolean stopping = false;
|
private volatile boolean stopping = false;
|
||||||
|
|
||||||
|
@ -116,6 +117,7 @@ public class ThreadingTaskRunner
|
||||||
this.node = node;
|
this.node = node;
|
||||||
this.appenderatorsManager = appenderatorsManager;
|
this.appenderatorsManager = appenderatorsManager;
|
||||||
this.taskReportFileWriter = (MultipleFileTaskReportFileWriter) taskReportFileWriter;
|
this.taskReportFileWriter = (MultipleFileTaskReportFileWriter) taskReportFileWriter;
|
||||||
|
this.workerConfig = workerConfig;
|
||||||
this.taskExecutor = MoreExecutors.listeningDecorator(
|
this.taskExecutor = MoreExecutors.listeningDecorator(
|
||||||
Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-executor-%d")
|
Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-executor-%d")
|
||||||
);
|
);
|
||||||
|
@ -451,6 +453,36 @@ public class ThreadingTaskRunner
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
return workerConfig.getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return getRunningTasks().size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> QueryRunner<T> getQueryRunnerForIntervals(
|
public <T> QueryRunner<T> getQueryRunnerForIntervals(
|
||||||
Query<T> query,
|
Query<T> query,
|
||||||
|
|
|
@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.collect.Collections2;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
@ -1342,6 +1343,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
).collect(Collectors.toList());
|
).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Collection<ImmutableWorkerInfo> getBlackListedWorkers()
|
||||||
|
{
|
||||||
|
return ImmutableList.copyOf(Collections2.transform(blackListedWorkers.values(), WorkerHolder::toImmutable));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only.
|
* Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only.
|
||||||
*/
|
*/
|
||||||
|
@ -1547,6 +1553,61 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalPeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getWorkers()) {
|
||||||
|
totalPeons += worker.getWorker().getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalPeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalIdlePeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
|
||||||
|
totalIdlePeons += worker.getAvailableCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalIdlePeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalUsedPeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getWorkers()) {
|
||||||
|
totalUsedPeons += worker.getCurrCapacityUsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalUsedPeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalLazyPeons = 0;
|
||||||
|
for (Worker worker : getLazyWorkers()) {
|
||||||
|
totalLazyPeons += worker.getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalLazyPeons;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
long totalBlacklistedPeons = 0;
|
||||||
|
for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
|
||||||
|
totalBlacklistedPeons += worker.getWorker().getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalBlacklistedPeons;
|
||||||
|
}
|
||||||
|
|
||||||
private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem
|
private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem
|
||||||
{
|
{
|
||||||
enum State
|
enum State
|
||||||
|
|
|
@ -383,5 +383,35 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,16 +104,26 @@ public class RemoteTaskRunnerTest
|
||||||
{
|
{
|
||||||
doSetup();
|
doSetup();
|
||||||
|
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
|
||||||
|
|
||||||
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
|
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
|
||||||
|
|
||||||
Assert.assertTrue(taskAnnounced(task.getId()));
|
Assert.assertTrue(taskAnnounced(task.getId()));
|
||||||
mockWorkerRunningTask(task);
|
mockWorkerRunningTask(task);
|
||||||
Assert.assertTrue(workerRunningTask(task.getId()));
|
Assert.assertTrue(workerRunningTask(task.getId()));
|
||||||
|
|
||||||
mockWorkerCompleteSuccessfulTask(task);
|
mockWorkerCompleteSuccessfulTask(task);
|
||||||
Assert.assertTrue(workerCompletedTask(result));
|
Assert.assertTrue(workerCompletedTask(result));
|
||||||
|
|
||||||
Assert.assertEquals(task.getId(), result.get().getId());
|
Assert.assertEquals(task.getId(), result.get().getId());
|
||||||
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
|
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
|
||||||
|
|
||||||
|
cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId()));
|
||||||
|
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -421,6 +431,9 @@ public class RemoteTaskRunnerTest
|
||||||
public void testWorkerRemoved() throws Exception
|
public void testWorkerRemoved() throws Exception
|
||||||
{
|
{
|
||||||
doSetup();
|
doSetup();
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
|
||||||
|
|
||||||
Future<TaskStatus> future = remoteTaskRunner.run(task);
|
Future<TaskStatus> future = remoteTaskRunner.run(task);
|
||||||
|
|
||||||
Assert.assertTrue(taskAnnounced(task.getId()));
|
Assert.assertTrue(taskAnnounced(task.getId()));
|
||||||
|
@ -449,6 +462,9 @@ public class RemoteTaskRunnerTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));
|
Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));
|
||||||
|
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -621,6 +637,9 @@ public class RemoteTaskRunnerTest
|
||||||
);
|
);
|
||||||
Assert.assertEquals(1, lazyworkers.size());
|
Assert.assertEquals(1, lazyworkers.size());
|
||||||
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
|
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -931,10 +950,12 @@ public class RemoteTaskRunnerTest
|
||||||
mockWorkerCompleteFailedTask(task1);
|
mockWorkerCompleteFailedTask(task1);
|
||||||
Assert.assertTrue(taskFuture1.get().isFailure());
|
Assert.assertTrue(taskFuture1.get().isFailure());
|
||||||
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
|
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
|
||||||
|
|
||||||
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
|
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
|
||||||
Assert.assertTrue(taskAnnounced(task2.getId()));
|
Assert.assertTrue(taskAnnounced(task2.getId()));
|
||||||
mockWorkerRunningTask(task2);
|
mockWorkerRunningTask(task2);
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
|
||||||
|
|
||||||
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
|
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
|
||||||
Assert.assertTrue(taskAnnounced(task3.getId()));
|
Assert.assertTrue(taskAnnounced(task3.getId()));
|
||||||
|
@ -942,10 +963,12 @@ public class RemoteTaskRunnerTest
|
||||||
mockWorkerCompleteFailedTask(task3);
|
mockWorkerCompleteFailedTask(task3);
|
||||||
Assert.assertTrue(taskFuture3.get().isFailure());
|
Assert.assertTrue(taskFuture3.get().isFailure());
|
||||||
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
|
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
|
||||||
|
Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount());
|
||||||
|
|
||||||
mockWorkerCompleteSuccessfulTask(task2);
|
mockWorkerCompleteSuccessfulTask(task2);
|
||||||
Assert.assertTrue(taskFuture2.get().isSuccess());
|
Assert.assertTrue(taskFuture2.get().isSuccess());
|
||||||
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
|
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
|
||||||
|
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -268,6 +268,36 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start()
|
public void start()
|
||||||
{
|
{
|
||||||
|
|
|
@ -842,6 +842,10 @@ public class HttpRemoteTaskRunnerTest
|
||||||
|
|
||||||
taskRunner.start();
|
taskRunner.start();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount());
|
||||||
|
|
||||||
AtomicInteger ticks = new AtomicInteger();
|
AtomicInteger ticks = new AtomicInteger();
|
||||||
|
|
||||||
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
|
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
|
||||||
|
@ -884,12 +888,20 @@ public class HttpRemoteTaskRunnerTest
|
||||||
|
|
||||||
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
|
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
|
||||||
|
|
||||||
|
Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount());
|
||||||
|
|
||||||
taskRunner.run(task1);
|
taskRunner.run(task1);
|
||||||
|
|
||||||
while (ticks.get() < 1) {
|
while (ticks.get() < 1) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
|
||||||
|
|
||||||
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
|
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
|
||||||
new DruidNode("service", "host2", false, 8080, null, true, false),
|
new DruidNode("service", "host2", false, 8080, null, true, false),
|
||||||
NodeRole.MIDDLE_MANAGER,
|
NodeRole.MIDDLE_MANAGER,
|
||||||
|
@ -918,12 +930,20 @@ public class HttpRemoteTaskRunnerTest
|
||||||
|
|
||||||
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2));
|
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2));
|
||||||
|
|
||||||
|
Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
|
||||||
|
|
||||||
taskRunner.run(task2);
|
taskRunner.run(task2);
|
||||||
|
|
||||||
while (ticks.get() < 2) {
|
while (ticks.get() < 2) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
|
||||||
|
|
||||||
DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
|
DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
|
||||||
new DruidNode("service", "host3", false, 8080, null, true, false),
|
new DruidNode("service", "host3", false, 8080, null, true, false),
|
||||||
NodeRole.MIDDLE_MANAGER,
|
NodeRole.MIDDLE_MANAGER,
|
||||||
|
@ -952,6 +972,11 @@ public class HttpRemoteTaskRunnerTest
|
||||||
|
|
||||||
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3));
|
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3));
|
||||||
|
|
||||||
|
Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount());
|
||||||
|
|
||||||
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
|
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
|
||||||
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
|
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
|
||||||
|
|
||||||
|
@ -960,6 +985,11 @@ public class HttpRemoteTaskRunnerTest
|
||||||
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))
|
Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))
|
||||||
.getHost()
|
.getHost()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount());
|
||||||
|
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
|
||||||
|
Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -452,6 +452,36 @@ public class OverlordTest
|
||||||
return Optional.absent();
|
return Optional.absent();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start()
|
public void start()
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.metrics;
|
||||||
|
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||||
|
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||||
|
import org.apache.druid.java.util.metrics.AbstractMonitor;
|
||||||
|
|
||||||
|
public class TaskSlotCountStatsMonitor extends AbstractMonitor
|
||||||
|
{
|
||||||
|
private final TaskSlotCountStatsProvider statsProvider;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public TaskSlotCountStatsMonitor(
|
||||||
|
TaskSlotCountStatsProvider statsProvider
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.statsProvider = statsProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean doMonitor(ServiceEmitter emitter)
|
||||||
|
{
|
||||||
|
emit(emitter, "taskSlot/total/count", statsProvider.getTotalTaskSlotCount());
|
||||||
|
emit(emitter, "taskSlot/idle/count", statsProvider.getIdleTaskSlotCount());
|
||||||
|
emit(emitter, "taskSlot/used/count", statsProvider.getUsedTaskSlotCount());
|
||||||
|
emit(emitter, "taskSlot/lazy/count", statsProvider.getLazyTaskSlotCount());
|
||||||
|
emit(emitter, "taskSlot/blacklisted/count", statsProvider.getBlacklistedTaskSlotCount());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void emit(ServiceEmitter emitter, String key, Long count)
|
||||||
|
{
|
||||||
|
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
||||||
|
if (count != null) {
|
||||||
|
emitter.emit(builder.build(key, count.longValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.metrics;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
public interface TaskSlotCountStatsProvider
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Return the number of total task slots during emission period.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
Long getTotalTaskSlotCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the number of idle task slots during emission period.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
Long getIdleTaskSlotCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the number of used task slots during emission period.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
Long getUsedTaskSlotCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the total number of task slots in lazy marked middlemanagers and indexers during emission period.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
Long getLazyTaskSlotCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the total number of task slots in blacklisted middlemanagers and indexers during emission period.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
Long getBlacklistedTaskSlotCount();
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.metrics;
|
||||||
|
|
||||||
|
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TaskSlotCountStatsMonitorTest
|
||||||
|
{
|
||||||
|
private TaskSlotCountStatsProvider statsProvider;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp()
|
||||||
|
{
|
||||||
|
statsProvider = new TaskSlotCountStatsProvider()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Long getTotalTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 1L;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getIdleTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 1L;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getUsedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 1L;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getLazyTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 1L;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getBlacklistedTaskSlotCount()
|
||||||
|
{
|
||||||
|
return 1L;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMonitor()
|
||||||
|
{
|
||||||
|
final TaskSlotCountStatsMonitor monitor = new TaskSlotCountStatsMonitor(statsProvider);
|
||||||
|
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
|
||||||
|
monitor.doMonitor(emitter);
|
||||||
|
Assert.assertEquals(5, emitter.getEvents().size());
|
||||||
|
Assert.assertEquals("taskSlot/total/count", emitter.getEvents().get(0).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
|
||||||
|
Assert.assertEquals("taskSlot/idle/count", emitter.getEvents().get(1).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
|
||||||
|
Assert.assertEquals("taskSlot/used/count", emitter.getEvents().get(2).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
|
||||||
|
Assert.assertEquals("taskSlot/lazy/count", emitter.getEvents().get(3).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
|
||||||
|
Assert.assertEquals("taskSlot/blacklisted/count", emitter.getEvents().get(4).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
|
||||||
|
}
|
||||||
|
}
|
|
@ -113,6 +113,7 @@ import org.apache.druid.server.initialization.ServerConfig;
|
||||||
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
|
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
|
||||||
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
|
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
|
||||||
import org.apache.druid.server.metrics.TaskCountStatsProvider;
|
import org.apache.druid.server.metrics.TaskCountStatsProvider;
|
||||||
|
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
|
||||||
import org.apache.druid.server.security.AuthConfig;
|
import org.apache.druid.server.security.AuthConfig;
|
||||||
import org.apache.druid.server.security.AuthenticationUtils;
|
import org.apache.druid.server.security.AuthenticationUtils;
|
||||||
import org.apache.druid.server.security.Authenticator;
|
import org.apache.druid.server.security.Authenticator;
|
||||||
|
@ -179,6 +180,7 @@ public class CliOverlord extends ServerRunnable
|
||||||
|
|
||||||
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
|
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
|
||||||
binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
|
binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
|
||||||
|
binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class);
|
||||||
|
|
||||||
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
|
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
|
||||||
binder.bind(
|
binder.bind(
|
||||||
|
|
|
@ -1192,6 +1192,7 @@ RealtimeMetricsMonitor
|
||||||
Sys
|
Sys
|
||||||
SysMonitor
|
SysMonitor
|
||||||
TaskCountStatsMonitor
|
TaskCountStatsMonitor
|
||||||
|
TaskSlotCountStatsMonitor
|
||||||
bufferCapacity
|
bufferCapacity
|
||||||
bufferpoolName
|
bufferpoolName
|
||||||
cms
|
cms
|
||||||
|
|
Loading…
Reference in New Issue