diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 1c7c736e3c4..ef39085381d 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -210,11 +210,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| -|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| -|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.| +|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| ## Shuffle metrics (Native parallel task) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 5ac08862409..6c0fa75d1a1 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -63,11 +63,11 @@ "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, - "taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" }, - "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" }, + "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" }, + "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" }, "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" }, "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" }, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 212c0390a77..f9c36ef5810 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -27,6 +27,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -98,6 +99,7 @@ public class ForkingTaskRunner private final ListeningExecutorService exec; private final PortFinder portFinder; private final StartupLoggingConfig startupLoggingConfig; + private final WorkerConfig workerConfig; private volatile boolean stopping = false; @@ -120,6 +122,7 @@ public class ForkingTaskRunner this.node = node; this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts()); this.startupLoggingConfig = startupLoggingConfig; + this.workerConfig = workerConfig; this.exec = MoreExecutors.listeningDecorator( Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") ); @@ -666,7 +669,15 @@ public class ForkingTaskRunner } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() + { + if (config.getPorts() != null && !config.getPorts().isEmpty()) { + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size())); + } + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1)); + } + + public long getTotalTaskSlotCountLong() { if (config.getPorts() != null && !config.getPorts().isEmpty()) { return config.getPorts().size(); @@ -675,27 +686,32 @@ public class ForkingTaskRunner } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0); + return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0)); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() + { + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount())); + } + + public long getUsedTaskSlotCountLong() { return portFinder.findUsedPortCount(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(workerConfig.getCategory(), 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(workerConfig.getCategory(), 0L); } protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index de0713bc506..3c1cbb608e4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -95,6 +95,7 @@ import java.net.URL; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -1514,55 +1515,80 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - long totalPeons = 0; + Map totalPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + int workerCapacity = worker.getWorker().getCapacity(); + totalPeons.compute( + workerCategory, + (category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity + ); } return totalPeons; } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - long totalIdlePeons = 0; + Map totalIdlePeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { - totalIdlePeons += worker.getAvailableCapacity(); + String workerCategory = worker.getWorker().getCategory(); + int workerAvailableCapacity = worker.getAvailableCapacity(); + totalIdlePeons.compute( + workerCategory, + (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity + ); } return totalIdlePeons; } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - long totalUsedPeons = 0; + Map totalUsedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalUsedPeons += worker.getCurrCapacityUsed(); + String workerCategory = worker.getWorker().getCategory(); + int workerUsedCapacity = worker.getCurrCapacityUsed(); + totalUsedPeons.compute( + workerCategory, + (category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity + ); } return totalUsedPeons; } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - long totalLazyPeons = 0; + Map totalLazyPeons = new HashMap<>(); for (Worker worker : getLazyWorkers()) { - totalLazyPeons += worker.getCapacity(); + String workerCategory = worker.getCategory(); + int workerLazyPeons = worker.getCapacity(); + totalLazyPeons.compute( + workerCategory, + (category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons + ); } return totalLazyPeons; } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - long totalBlacklistedPeons = 0; + Map totalBlacklistedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { - totalBlacklistedPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + int workerBlacklistedPeons = worker.getWorker().getCapacity(); + totalBlacklistedPeons.compute( + workerCategory, + (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons + ); } return totalBlacklistedPeons; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index 24dba4f6520..b421f97519d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -59,6 +59,7 @@ import org.joda.time.Interval; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -334,34 +335,39 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke return Optional.absent(); } + /* This method should be never called in peons */ @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return 1; + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return runningItem == null ? 1 : 0; + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return runningItem == null ? 0 : 1; + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + throw new UnsupportedOperationException(); } + /* This method should be never called in peons */ @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + throw new UnsupportedOperationException(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 54287ed1422..5825a2fb03f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -346,7 +346,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro @Override @Nullable - public Long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -358,7 +358,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro @Override @Nullable - public Long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -370,7 +370,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro @Override @Nullable - public Long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -382,7 +382,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro @Override @Nullable - public Long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { @@ -394,7 +394,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro @Override @Nullable - public Long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { Optional taskRunner = getTaskRunner(); if (taskRunner.isPresent()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 8623e820b2b..5cbdabb3eda 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; /** @@ -124,14 +125,14 @@ public interface TaskRunner /** * APIs useful for emitting statistics for @TaskSlotCountStatsMonitor - */ - long getTotalTaskSlotCount(); + */ + Map getTotalTaskSlotCount(); - long getIdleTaskSlotCount(); + Map getIdleTaskSlotCount(); - long getUsedTaskSlotCount(); + Map getUsedTaskSlotCount(); - long getLazyTaskSlotCount(); + Map getLazyTaskSlotCount(); - long getBlacklistedTaskSlotCount(); + Map getBlacklistedTaskSlotCount(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java index bff6d61e4e1..79a67b41418 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Futures; @@ -63,6 +64,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; @@ -452,33 +454,43 @@ public class ThreadingTaskRunner } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() + { + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(workerConfig.getCapacity())); + } + + public long getTotalTaskSlotCountLong() { return workerConfig.getCapacity(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0); + return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0)); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() + { + return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(getRunningTasks().size())); + } + + public long getUsedTaskSlotCountLong() { return getRunningTasks().size(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(workerConfig.getCategory(), 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(workerConfig.getCategory(), 0L); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 59085a27afe..45af2157282 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -90,6 +90,7 @@ import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -1628,55 +1629,80 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - long totalPeons = 0; + Map totalPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + int workerCapacity = worker.getWorker().getCapacity(); + totalPeons.compute( + workerCategory, + (category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity + ); } return totalPeons; } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - long totalIdlePeons = 0; + Map totalIdlePeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) { - totalIdlePeons += worker.getAvailableCapacity(); + String workerCategory = worker.getWorker().getCategory(); + int workerAvailableCapacity = worker.getAvailableCapacity(); + totalIdlePeons.compute( + workerCategory, + (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity + ); } return totalIdlePeons; } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - long totalUsedPeons = 0; + Map totalUsedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getWorkers()) { - totalUsedPeons += worker.getCurrCapacityUsed(); + String workerCategory = worker.getWorker().getCategory(); + int workerUsedCapacity = worker.getCurrCapacityUsed(); + totalUsedPeons.compute( + workerCategory, + (category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity + ); } return totalUsedPeons; } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - long totalLazyPeons = 0; + Map totalLazyPeons = new HashMap<>(); for (Worker worker : getLazyWorkers()) { - totalLazyPeons += worker.getCapacity(); + String workerCategory = worker.getCategory(); + int workerLazyPeons = worker.getCapacity(); + totalLazyPeons.compute( + workerCategory, + (category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons + ); } return totalLazyPeons; } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - long totalBlacklistedPeons = 0; + Map totalBlacklistedPeons = new HashMap<>(); for (ImmutableWorkerInfo worker : getBlackListedWorkers()) { - totalBlacklistedPeons += worker.getWorker().getCapacity(); + String workerCategory = worker.getWorker().getCategory(); + int workerBlacklistedPeons = worker.getWorker().getCapacity(); + totalBlacklistedPeons.compute( + workerCategory, + (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons + ); } return totalBlacklistedPeons; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 81a6bcc6e9a..f2cdf5c320e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -82,6 +82,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -405,31 +406,31 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 589c6e1f62f..edb6f642b11 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -107,9 +108,9 @@ public class RemoteTaskRunnerTest { doSetup(); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); ListenableFuture result = remoteTaskRunner.run(task); @@ -124,9 +125,9 @@ public class RemoteTaskRunnerTest cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId())); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); } @Test @@ -437,8 +438,8 @@ public class RemoteTaskRunnerTest public void testWorkerRemoved() throws Exception { doSetup(); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Future future = remoteTaskRunner.run(task); @@ -471,8 +472,8 @@ public class RemoteTaskRunnerTest ); Assert.assertNull(cf.checkExists().forPath(STATUS_PATH)); - Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount()); + Assert.assertFalse(remoteTaskRunner.getTotalTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); + Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); } @Test @@ -677,9 +678,9 @@ public class RemoteTaskRunnerTest ); Assert.assertEquals(1, lazyworkers.size()); Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size()); - Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); + Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); } @Test @@ -990,12 +991,12 @@ public class RemoteTaskRunnerTest mockWorkerCompleteFailedTask(task1); Assert.assertTrue(taskFuture1.get().isFailure()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); Future taskFuture2 = remoteTaskRunner.run(task2); Assert.assertTrue(taskAnnounced(task2.getId())); mockWorkerRunningTask(task2); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); Future taskFuture3 = remoteTaskRunner.run(task3); Assert.assertTrue(taskAnnounced(task3.getId())); @@ -1003,12 +1004,12 @@ public class RemoteTaskRunnerTest mockWorkerCompleteFailedTask(task3); Assert.assertTrue(taskFuture3.get().isFailure()); Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); mockWorkerCompleteSuccessfulTask(task2); Assert.assertTrue(taskFuture2.get().isSuccess()); Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size()); - Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount()); + Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 0b3d8728a61..dc5fc0e1c6a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -493,33 +494,33 @@ public class TaskQueueTest extends IngestionTestBase } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 0; + return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L); } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index df293f0bcda..3a5df86a294 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -273,31 +273,31 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { throw new UnsupportedOperationException(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 7eeb50ee060..5a676113eda 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -813,6 +813,7 @@ public class HttpRemoteTaskRunnerTest Task task1 = NoopTask.create("task-id-1", 0); Task task2 = NoopTask.create("task-id-2", 0); + String additionalWorkerCategory = "category2"; ConcurrentMap workerHolders = new ConcurrentHashMap<>(); @@ -864,9 +865,9 @@ public class HttpRemoteTaskRunnerTest taskRunner.start(); - Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount()); + Assert.assertTrue(taskRunner.getTotalTaskSlotCount().isEmpty()); + Assert.assertTrue(taskRunner.getIdleTaskSlotCount().isEmpty()); + Assert.assertTrue(taskRunner.getUsedTaskSlotCount().isEmpty()); AtomicInteger ticks = new AtomicInteger(); @@ -910,9 +911,9 @@ public class HttpRemoteTaskRunnerTest druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1)); - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); taskRunner.run(task1); @@ -920,16 +921,16 @@ public class HttpRemoteTaskRunnerTest Thread.sleep(100); } - Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", false, 8080, null, true, false), NodeRole.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, - new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY) + new WorkerNodeService("ip2", 1, "0", additionalWorkerCategory) ) ); @@ -952,9 +953,12 @@ public class HttpRemoteTaskRunnerTest druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2)); - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); taskRunner.run(task2); @@ -962,9 +966,12 @@ public class HttpRemoteTaskRunnerTest Thread.sleep(100); } - Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory)); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", false, 8080, null, true, false), @@ -994,10 +1001,14 @@ public class HttpRemoteTaskRunnerTest druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3)); - Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory)); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY)); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory)); Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId()); Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId()); @@ -1008,10 +1019,14 @@ public class HttpRemoteTaskRunnerTest .getHost() ); - Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount()); - Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount()); - Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount()); + Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory)); + Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue()); + Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory)); } /* diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index aa70b2cabdc..5655a13345d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -80,9 +80,11 @@ import org.junit.Test; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; + import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -455,31 +457,31 @@ public class OverlordTest } @Override - public long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { throw new UnsupportedOperationException(); } @Override - public long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java index d8accc8892d..3431e1e143b 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; +import java.util.Map; + public class TaskSlotCountStatsMonitor extends AbstractMonitor { private final TaskSlotCountStatsProvider statsProvider; @@ -47,11 +49,14 @@ public class TaskSlotCountStatsMonitor extends AbstractMonitor return true; } - private void emit(ServiceEmitter emitter, String key, Long count) + private void emit(ServiceEmitter emitter, String key, Map counts) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - if (count != null) { - emitter.emit(builder.build(key, count.longValue())); + if (counts != null) { + counts.forEach((k, v) -> { + builder.setDimension("category", k); + emitter.emit(builder.build(key, v)); + }); } } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java index eb46fa5c2f1..e7a7249b10a 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java @@ -21,35 +21,37 @@ package org.apache.druid.server.metrics; import javax.annotation.Nullable; +import java.util.Map; + public interface TaskSlotCountStatsProvider { /** * Return the number of total task slots during emission period. */ @Nullable - Long getTotalTaskSlotCount(); + Map getTotalTaskSlotCount(); /** * Return the number of idle task slots during emission period. */ @Nullable - Long getIdleTaskSlotCount(); + Map getIdleTaskSlotCount(); /** * Return the number of used task slots during emission period. */ @Nullable - Long getUsedTaskSlotCount(); + Map getUsedTaskSlotCount(); /** * Return the total number of task slots in lazy marked middlemanagers and indexers during emission period. */ @Nullable - Long getLazyTaskSlotCount(); + Map getLazyTaskSlotCount(); /** * Return the total number of task slots in blacklisted middlemanagers and indexers during emission period. */ @Nullable - Long getBlacklistedTaskSlotCount(); + Map getBlacklistedTaskSlotCount(); } diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java index 2c5c52b99e5..be33a4c98b6 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java @@ -19,11 +19,14 @@ package org.apache.druid.server.metrics; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Map; + public class TaskSlotCountStatsMonitorTest { private TaskSlotCountStatsProvider statsProvider; @@ -34,33 +37,33 @@ public class TaskSlotCountStatsMonitorTest statsProvider = new TaskSlotCountStatsProvider() { @Override - public Long getTotalTaskSlotCount() + public Map getTotalTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getIdleTaskSlotCount() + public Map getIdleTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getUsedTaskSlotCount() + public Map getUsedTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getLazyTaskSlotCount() + public Map getLazyTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } @Override - public Long getBlacklistedTaskSlotCount() + public Map getBlacklistedTaskSlotCount() { - return 1L; + return ImmutableMap.of("c1", 1L); } }; }