Add worker category dimension (#11554)

* Add worker category as dimension in TaskSlotCountStatsMonitor

* Change description

* Add workerConfig as field

* Modify HttpRemoteTaskRunnerTest to test worker category in taskslot metrics

* Fixing tests

* Fixing alerts

* Adding unit test in SingleTaskBackgroundRunnerTest for task slot metrics APIs

* Resolving false positive spell check

* addressing comments

* throw UnsupportedOperationException for tasklotmetrics APIs in SingleTaskBackgroundRunner

Co-authored-by: Nikhil Navadiya <nnavadiya@twitter.com>
This commit is contained in:
Nikhil Navadiya 2021-11-18 22:59:07 -08:00 committed by GitHub
parent a4353aa1f4
commit 3c51136098
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 278 additions and 161 deletions

View File

@ -210,11 +210,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
## Shuffle metrics (Native parallel task) ## Shuffle metrics (Native parallel task)

View File

@ -63,11 +63,11 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" }, "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" }, "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" }, "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" }, "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" }, "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },
"task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" }, "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
"segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" }, "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },

View File

@ -27,6 +27,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -98,6 +99,7 @@ public class ForkingTaskRunner
private final ListeningExecutorService exec; private final ListeningExecutorService exec;
private final PortFinder portFinder; private final PortFinder portFinder;
private final StartupLoggingConfig startupLoggingConfig; private final StartupLoggingConfig startupLoggingConfig;
private final WorkerConfig workerConfig;
private volatile boolean stopping = false; private volatile boolean stopping = false;
@ -120,6 +122,7 @@ public class ForkingTaskRunner
this.node = node; this.node = node;
this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts()); this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts());
this.startupLoggingConfig = startupLoggingConfig; this.startupLoggingConfig = startupLoggingConfig;
this.workerConfig = workerConfig;
this.exec = MoreExecutors.listeningDecorator( this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d")
); );
@ -666,7 +669,15 @@ public class ForkingTaskRunner
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{
if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
}
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
}
public long getTotalTaskSlotCountLong()
{ {
if (config.getPorts() != null && !config.getPorts().isEmpty()) { if (config.getPorts() != null && !config.getPorts().isEmpty()) {
return config.getPorts().size(); return config.getPorts().size();
@ -675,27 +686,32 @@ public class ForkingTaskRunner
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0); return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount()));
}
public long getUsedTaskSlotCountLong()
{ {
return portFinder.findUsedPortCount(); return portFinder.findUsedPortCount();
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
return 0; return ImmutableMap.of(workerConfig.getCategory(), 0L);
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
return 0; return ImmutableMap.of(workerConfig.getCategory(), 0L);
} }
protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem

View File

@ -95,6 +95,7 @@ import java.net.URL;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -1514,55 +1515,80 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
long totalPeons = 0; Map<String, Long> totalPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkers()) { for (ImmutableWorkerInfo worker : getWorkers()) {
totalPeons += worker.getWorker().getCapacity(); String workerCategory = worker.getWorker().getCategory();
int workerCapacity = worker.getWorker().getCapacity();
totalPeons.compute(
workerCategory,
(category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity
);
} }
return totalPeons; return totalPeons;
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
long totalIdlePeons = 0; Map<String, Long> totalIdlePeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
totalIdlePeons += worker.getAvailableCapacity(); String workerCategory = worker.getWorker().getCategory();
int workerAvailableCapacity = worker.getAvailableCapacity();
totalIdlePeons.compute(
workerCategory,
(category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity
);
} }
return totalIdlePeons; return totalIdlePeons;
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
long totalUsedPeons = 0; Map<String, Long> totalUsedPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkers()) { for (ImmutableWorkerInfo worker : getWorkers()) {
totalUsedPeons += worker.getCurrCapacityUsed(); String workerCategory = worker.getWorker().getCategory();
int workerUsedCapacity = worker.getCurrCapacityUsed();
totalUsedPeons.compute(
workerCategory,
(category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity
);
} }
return totalUsedPeons; return totalUsedPeons;
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
long totalLazyPeons = 0; Map<String, Long> totalLazyPeons = new HashMap<>();
for (Worker worker : getLazyWorkers()) { for (Worker worker : getLazyWorkers()) {
totalLazyPeons += worker.getCapacity(); String workerCategory = worker.getCategory();
int workerLazyPeons = worker.getCapacity();
totalLazyPeons.compute(
workerCategory,
(category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons
);
} }
return totalLazyPeons; return totalLazyPeons;
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
long totalBlacklistedPeons = 0; Map<String, Long> totalBlacklistedPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
totalBlacklistedPeons += worker.getWorker().getCapacity(); String workerCategory = worker.getWorker().getCategory();
int workerBlacklistedPeons = worker.getWorker().getCapacity();
totalBlacklistedPeons.compute(
workerCategory,
(category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons
);
} }
return totalBlacklistedPeons; return totalBlacklistedPeons;

View File

@ -59,6 +59,7 @@ import org.joda.time.Interval;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -334,34 +335,39 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
return Optional.absent(); return Optional.absent();
} }
/* This method should be never called in peons */
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
return 1; throw new UnsupportedOperationException();
} }
/* This method should be never called in peons */
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
return runningItem == null ? 1 : 0; throw new UnsupportedOperationException();
} }
/* This method should be never called in peons */
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
return runningItem == null ? 0 : 1; throw new UnsupportedOperationException();
} }
/* This method should be never called in peons */
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
return 0; throw new UnsupportedOperationException();
} }
/* This method should be never called in peons */
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
return 0; throw new UnsupportedOperationException();
} }
@Override @Override

View File

@ -346,7 +346,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
@Override @Override
@Nullable @Nullable
public Long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
Optional<TaskRunner> taskRunner = getTaskRunner(); Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) { if (taskRunner.isPresent()) {
@ -358,7 +358,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
@Override @Override
@Nullable @Nullable
public Long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
Optional<TaskRunner> taskRunner = getTaskRunner(); Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) { if (taskRunner.isPresent()) {
@ -370,7 +370,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
@Override @Override
@Nullable @Nullable
public Long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
Optional<TaskRunner> taskRunner = getTaskRunner(); Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) { if (taskRunner.isPresent()) {
@ -382,7 +382,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
@Override @Override
@Nullable @Nullable
public Long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
Optional<TaskRunner> taskRunner = getTaskRunner(); Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) { if (taskRunner.isPresent()) {
@ -394,7 +394,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
@Override @Override
@Nullable @Nullable
public Long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
Optional<TaskRunner> taskRunner = getTaskRunner(); Optional<TaskRunner> taskRunner = getTaskRunner();
if (taskRunner.isPresent()) { if (taskRunner.isPresent()) {

View File

@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
@ -124,14 +125,14 @@ public interface TaskRunner
/** /**
* APIs useful for emitting statistics for @TaskSlotCountStatsMonitor * APIs useful for emitting statistics for @TaskSlotCountStatsMonitor
*/ */
long getTotalTaskSlotCount(); Map<String, Long> getTotalTaskSlotCount();
long getIdleTaskSlotCount(); Map<String, Long> getIdleTaskSlotCount();
long getUsedTaskSlotCount(); Map<String, Long> getUsedTaskSlotCount();
long getLazyTaskSlotCount(); Map<String, Long> getLazyTaskSlotCount();
long getBlacklistedTaskSlotCount(); Map<String, Long> getBlacklistedTaskSlotCount();
} }

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -63,6 +64,7 @@ import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -452,33 +454,43 @@ public class ThreadingTaskRunner
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(workerConfig.getCapacity()));
}
public long getTotalTaskSlotCountLong()
{ {
return workerConfig.getCapacity(); return workerConfig.getCapacity();
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0); return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{
return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(getRunningTasks().size()));
}
public long getUsedTaskSlotCountLong()
{ {
return getRunningTasks().size(); return getRunningTasks().size();
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
return 0; return ImmutableMap.of(workerConfig.getCategory(), 0L);
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
return 0; return ImmutableMap.of(workerConfig.getCategory(), 0L);
} }
@Override @Override

View File

@ -90,6 +90,7 @@ import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -1628,55 +1629,80 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
long totalPeons = 0; Map<String, Long> totalPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkers()) { for (ImmutableWorkerInfo worker : getWorkers()) {
totalPeons += worker.getWorker().getCapacity(); String workerCategory = worker.getWorker().getCategory();
int workerCapacity = worker.getWorker().getCapacity();
totalPeons.compute(
workerCategory,
(category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity
);
} }
return totalPeons; return totalPeons;
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
long totalIdlePeons = 0; Map<String, Long> totalIdlePeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
totalIdlePeons += worker.getAvailableCapacity(); String workerCategory = worker.getWorker().getCategory();
int workerAvailableCapacity = worker.getAvailableCapacity();
totalIdlePeons.compute(
workerCategory,
(category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity
);
} }
return totalIdlePeons; return totalIdlePeons;
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
long totalUsedPeons = 0; Map<String, Long> totalUsedPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getWorkers()) { for (ImmutableWorkerInfo worker : getWorkers()) {
totalUsedPeons += worker.getCurrCapacityUsed(); String workerCategory = worker.getWorker().getCategory();
int workerUsedCapacity = worker.getCurrCapacityUsed();
totalUsedPeons.compute(
workerCategory,
(category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity
);
} }
return totalUsedPeons; return totalUsedPeons;
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
long totalLazyPeons = 0; Map<String, Long> totalLazyPeons = new HashMap<>();
for (Worker worker : getLazyWorkers()) { for (Worker worker : getLazyWorkers()) {
totalLazyPeons += worker.getCapacity(); String workerCategory = worker.getCategory();
int workerLazyPeons = worker.getCapacity();
totalLazyPeons.compute(
workerCategory,
(category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons
);
} }
return totalLazyPeons; return totalLazyPeons;
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
long totalBlacklistedPeons = 0; Map<String, Long> totalBlacklistedPeons = new HashMap<>();
for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
totalBlacklistedPeons += worker.getWorker().getCapacity(); String workerCategory = worker.getWorker().getCategory();
int workerBlacklistedPeons = worker.getWorker().getCapacity();
totalBlacklistedPeons.compute(
workerCategory,
(category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons
);
} }
return totalBlacklistedPeons; return totalBlacklistedPeons;

View File

@ -82,6 +82,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -405,31 +406,31 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
@ -107,9 +108,9 @@ public class RemoteTaskRunnerTest
{ {
doSetup(); doSetup();
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task); ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
@ -124,9 +125,9 @@ public class RemoteTaskRunnerTest
cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId())); cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId()));
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
} }
@Test @Test
@ -437,8 +438,8 @@ public class RemoteTaskRunnerTest
public void testWorkerRemoved() throws Exception public void testWorkerRemoved() throws Exception
{ {
doSetup(); doSetup();
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Future<TaskStatus> future = remoteTaskRunner.run(task); Future<TaskStatus> future = remoteTaskRunner.run(task);
@ -471,8 +472,8 @@ public class RemoteTaskRunnerTest
); );
Assert.assertNull(cf.checkExists().forPath(STATUS_PATH)); Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));
Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount()); Assert.assertFalse(remoteTaskRunner.getTotalTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount()); Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
} }
@Test @Test
@ -677,9 +678,9 @@ public class RemoteTaskRunnerTest
); );
Assert.assertEquals(1, lazyworkers.size()); Assert.assertEquals(1, lazyworkers.size());
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size()); Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount()); Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
} }
@Test @Test
@ -990,12 +991,12 @@ public class RemoteTaskRunnerTest
mockWorkerCompleteFailedTask(task1); mockWorkerCompleteFailedTask(task1);
Assert.assertTrue(taskFuture1.get().isFailure()); Assert.assertTrue(taskFuture1.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2); Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
Assert.assertTrue(taskAnnounced(task2.getId())); Assert.assertTrue(taskAnnounced(task2.getId()));
mockWorkerRunningTask(task2); mockWorkerRunningTask(task2);
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3); Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId())); Assert.assertTrue(taskAnnounced(task3.getId()));
@ -1003,12 +1004,12 @@ public class RemoteTaskRunnerTest
mockWorkerCompleteFailedTask(task3); mockWorkerCompleteFailedTask(task3);
Assert.assertTrue(taskFuture3.get().isFailure()); Assert.assertTrue(taskFuture3.get().isFailure());
Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount()); Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
mockWorkerCompleteSuccessfulTask(task2); mockWorkerCompleteSuccessfulTask(task2);
Assert.assertTrue(taskFuture2.get().isSuccess()); Assert.assertTrue(taskFuture2.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
} }
@Test @Test

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -493,33 +494,33 @@ public class TaskQueueTest extends IngestionTestBase
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
return 0; return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
return 0; return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
return 0; return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
return 0; return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
return 0; return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
} }
} }
} }

View File

@ -273,31 +273,31 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -813,6 +813,7 @@ public class HttpRemoteTaskRunnerTest
Task task1 = NoopTask.create("task-id-1", 0); Task task1 = NoopTask.create("task-id-1", 0);
Task task2 = NoopTask.create("task-id-2", 0); Task task2 = NoopTask.create("task-id-2", 0);
String additionalWorkerCategory = "category2";
ConcurrentMap<String, CustomFunction> workerHolders = new ConcurrentHashMap<>(); ConcurrentMap<String, CustomFunction> workerHolders = new ConcurrentHashMap<>();
@ -864,9 +865,9 @@ public class HttpRemoteTaskRunnerTest
taskRunner.start(); taskRunner.start();
Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount()); Assert.assertTrue(taskRunner.getTotalTaskSlotCount().isEmpty());
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); Assert.assertTrue(taskRunner.getIdleTaskSlotCount().isEmpty());
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount()); Assert.assertTrue(taskRunner.getUsedTaskSlotCount().isEmpty());
AtomicInteger ticks = new AtomicInteger(); AtomicInteger ticks = new AtomicInteger();
@ -910,9 +911,9 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount()); Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
taskRunner.run(task1); taskRunner.run(task1);
@ -920,16 +921,16 @@ public class HttpRemoteTaskRunnerTest
Thread.sleep(100); Thread.sleep(100);
} }
Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", false, 8080, null, true, false), new DruidNode("service", "host2", false, 8080, null, true, false),
NodeRole.MIDDLE_MANAGER, NodeRole.MIDDLE_MANAGER,
ImmutableMap.of( ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, WorkerNodeService.DISCOVERY_SERVICE_KEY,
new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY) new WorkerNodeService("ip2", 1, "0", additionalWorkerCategory)
) )
); );
@ -952,9 +953,12 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2)); druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2));
Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
taskRunner.run(task2); taskRunner.run(task2);
@ -962,9 +966,12 @@ public class HttpRemoteTaskRunnerTest
Thread.sleep(100); Thread.sleep(100);
} }
Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
new DruidNode("service", "host3", false, 8080, null, true, false), new DruidNode("service", "host3", false, 8080, null, true, false),
@ -994,10 +1001,14 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3)); druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3));
Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount()); Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount()); Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
@ -1008,10 +1019,14 @@ public class HttpRemoteTaskRunnerTest
.getHost() .getHost()
); );
Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount()); Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount()); Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
} }
/* /*

View File

@ -80,9 +80,11 @@ import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -455,31 +457,31 @@ public class OverlordTest
} }
@Override @Override
public long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -24,6 +24,8 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.AbstractMonitor;
import java.util.Map;
public class TaskSlotCountStatsMonitor extends AbstractMonitor public class TaskSlotCountStatsMonitor extends AbstractMonitor
{ {
private final TaskSlotCountStatsProvider statsProvider; private final TaskSlotCountStatsProvider statsProvider;
@ -47,11 +49,14 @@ public class TaskSlotCountStatsMonitor extends AbstractMonitor
return true; return true;
} }
private void emit(ServiceEmitter emitter, String key, Long count) private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts)
{ {
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
if (count != null) { if (counts != null) {
emitter.emit(builder.build(key, count.longValue())); counts.forEach((k, v) -> {
builder.setDimension("category", k);
emitter.emit(builder.build(key, v));
});
} }
} }
} }

View File

@ -21,35 +21,37 @@ package org.apache.druid.server.metrics;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Map;
public interface TaskSlotCountStatsProvider public interface TaskSlotCountStatsProvider
{ {
/** /**
* Return the number of total task slots during emission period. * Return the number of total task slots during emission period.
*/ */
@Nullable @Nullable
Long getTotalTaskSlotCount(); Map<String, Long> getTotalTaskSlotCount();
/** /**
* Return the number of idle task slots during emission period. * Return the number of idle task slots during emission period.
*/ */
@Nullable @Nullable
Long getIdleTaskSlotCount(); Map<String, Long> getIdleTaskSlotCount();
/** /**
* Return the number of used task slots during emission period. * Return the number of used task slots during emission period.
*/ */
@Nullable @Nullable
Long getUsedTaskSlotCount(); Map<String, Long> getUsedTaskSlotCount();
/** /**
* Return the total number of task slots in lazy marked middlemanagers and indexers during emission period. * Return the total number of task slots in lazy marked middlemanagers and indexers during emission period.
*/ */
@Nullable @Nullable
Long getLazyTaskSlotCount(); Map<String, Long> getLazyTaskSlotCount();
/** /**
* Return the total number of task slots in blacklisted middlemanagers and indexers during emission period. * Return the total number of task slots in blacklisted middlemanagers and indexers during emission period.
*/ */
@Nullable @Nullable
Long getBlacklistedTaskSlotCount(); Map<String, Long> getBlacklistedTaskSlotCount();
} }

View File

@ -19,11 +19,14 @@
package org.apache.druid.server.metrics; package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Map;
public class TaskSlotCountStatsMonitorTest public class TaskSlotCountStatsMonitorTest
{ {
private TaskSlotCountStatsProvider statsProvider; private TaskSlotCountStatsProvider statsProvider;
@ -34,33 +37,33 @@ public class TaskSlotCountStatsMonitorTest
statsProvider = new TaskSlotCountStatsProvider() statsProvider = new TaskSlotCountStatsProvider()
{ {
@Override @Override
public Long getTotalTaskSlotCount() public Map<String, Long> getTotalTaskSlotCount()
{ {
return 1L; return ImmutableMap.of("c1", 1L);
} }
@Override @Override
public Long getIdleTaskSlotCount() public Map<String, Long> getIdleTaskSlotCount()
{ {
return 1L; return ImmutableMap.of("c1", 1L);
} }
@Override @Override
public Long getUsedTaskSlotCount() public Map<String, Long> getUsedTaskSlotCount()
{ {
return 1L; return ImmutableMap.of("c1", 1L);
} }
@Override @Override
public Long getLazyTaskSlotCount() public Map<String, Long> getLazyTaskSlotCount()
{ {
return 1L; return ImmutableMap.of("c1", 1L);
} }
@Override @Override
public Long getBlacklistedTaskSlotCount() public Map<String, Long> getBlacklistedTaskSlotCount()
{ {
return 1L; return ImmutableMap.of("c1", 1L);
} }
}; };
} }