mirror of https://github.com/apache/druid.git
Add new coordinator metrics for coordinator duty runtimes (#10603)
* Add new coordinator metrics for duty runtimes * fix spelling for a constant variable value * add comment clarifying why the global runtime metric is emitted where it is * Remove duty alias in lieu of using the class name for metrics * fix docs * CoordinatorStats tests + add duty stats to accumulate() logic
This commit is contained in:
parent
30bcb0fd74
commit
2560bf0a19
|
@ -254,6 +254,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|
|||
|`segment/skipCompact/bytes`|Total bytes of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|
||||
|`segment/skipCompact/count`|Total number of segments of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|
||||
|`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|
||||
|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
|
||||
|`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
|
||||
|
||||
If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](
|
||||
../configuration/index.html#dynamic-configuration), then [log entries](../configuration/logging.md) for class
|
||||
|
|
|
@ -42,6 +42,9 @@ public class DruidMetrics
|
|||
public static final String SERVER = "server";
|
||||
public static final String TIER = "tier";
|
||||
|
||||
public static final String DUTY = "duty";
|
||||
public static final String DUTY_GROUP = "dutyGroup";
|
||||
|
||||
public static int findNumComplexAggs(List<AggregatorFactory> aggs)
|
||||
{
|
||||
int retVal = 0;
|
||||
|
|
|
@ -35,12 +35,14 @@ public class CoordinatorStats
|
|||
{
|
||||
private final Map<String, Object2LongOpenHashMap<String>> perTierStats;
|
||||
private final Map<String, Object2LongOpenHashMap<String>> perDataSourceStats;
|
||||
private final Map<String, Object2LongOpenHashMap<String>> perDutyStats;
|
||||
private final Object2LongOpenHashMap<String> globalStats;
|
||||
|
||||
public CoordinatorStats()
|
||||
{
|
||||
perTierStats = new HashMap<>();
|
||||
perDataSourceStats = new HashMap<>();
|
||||
perDutyStats = new HashMap<>();
|
||||
globalStats = new Object2LongOpenHashMap<>();
|
||||
}
|
||||
|
||||
|
@ -54,6 +56,11 @@ public class CoordinatorStats
|
|||
return !perDataSourceStats.isEmpty();
|
||||
}
|
||||
|
||||
public boolean hasPerDutyStats()
|
||||
{
|
||||
return !perDutyStats.isEmpty();
|
||||
}
|
||||
|
||||
public Set<String> getTiers(final String statName)
|
||||
{
|
||||
final Object2LongOpenHashMap<String> theStat = perTierStats.get(statName);
|
||||
|
@ -72,6 +79,15 @@ public class CoordinatorStats
|
|||
return Collections.unmodifiableSet(stat.keySet());
|
||||
}
|
||||
|
||||
public Set<String> getDuties(String statName)
|
||||
{
|
||||
final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName);
|
||||
if (stat == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
return Collections.unmodifiableSet(stat.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param statName the name of the statistics
|
||||
|
@ -109,6 +125,21 @@ public class CoordinatorStats
|
|||
}
|
||||
}
|
||||
|
||||
public long getDutyStat(String statName, String duty)
|
||||
{
|
||||
return perDutyStats.get(statName).getLong(duty);
|
||||
}
|
||||
|
||||
public void forEachDutyStat(String statName, ObjLongConsumer<String> consumer)
|
||||
{
|
||||
final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName);
|
||||
if (stat != null) {
|
||||
for (Entry<String> entry : stat.object2LongEntrySet()) {
|
||||
consumer.accept(entry.getKey(), entry.getLongValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long getGlobalStat(final String statName)
|
||||
{
|
||||
return globalStats.getLong(statName);
|
||||
|
@ -132,6 +163,12 @@ public class CoordinatorStats
|
|||
.addTo(dataSource, value);
|
||||
}
|
||||
|
||||
public void addToDutyStat(String statName, String duty, long value)
|
||||
{
|
||||
perDutyStats.computeIfAbsent(statName, k -> new Object2LongOpenHashMap<>())
|
||||
.addTo(duty, value);
|
||||
}
|
||||
|
||||
public void addToGlobalStat(final String statName, final long value)
|
||||
{
|
||||
globalStats.addTo(statName, value);
|
||||
|
@ -166,6 +203,20 @@ public class CoordinatorStats
|
|||
}
|
||||
);
|
||||
|
||||
stats.perDutyStats.forEach(
|
||||
(statName, urStat) -> {
|
||||
final Object2LongOpenHashMap<String> myStat = perDutyStats.computeIfAbsent(
|
||||
statName,
|
||||
k -> new Object2LongOpenHashMap<>()
|
||||
);
|
||||
|
||||
for (Entry<String> entry : urStat.object2LongEntrySet()) {
|
||||
myStat.addTo(entry.getKey(), entry.getLongValue());
|
||||
}
|
||||
}
|
||||
|
||||
);
|
||||
|
||||
for (final Object2LongMap.Entry<String> entry : stats.globalStats.object2LongEntrySet()) {
|
||||
globalStats.addTo(entry.getKey(), entry.getLongValue());
|
||||
}
|
||||
|
|
|
@ -60,8 +60,10 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
|||
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.druid.metadata.MetadataRuleManager;
|
||||
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||
import org.apache.druid.query.DruidMetrics;
|
||||
import org.apache.druid.server.DruidNode;
|
||||
import org.apache.druid.server.coordinator.duty.BalanceSegments;
|
||||
import org.apache.druid.server.coordinator.duty.CompactSegments;
|
||||
|
@ -93,6 +95,7 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -158,6 +161,10 @@ public class DruidCoordinator
|
|||
private int cachedBalancerThreadNumber;
|
||||
private ListeningExecutorService balancerExec;
|
||||
|
||||
private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
|
||||
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
|
||||
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
|
||||
|
||||
@Inject
|
||||
public DruidCoordinator(
|
||||
DruidCoordinatorConfig config,
|
||||
|
@ -573,7 +580,7 @@ public class DruidCoordinator
|
|||
public void runCompactSegmentsDuty()
|
||||
{
|
||||
final int startingLeaderCounter = coordLeaderSelector.localTerm();
|
||||
DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter);
|
||||
DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter, COMPACT_SEGMENTS_DUTIES_DUTY_GROUP);
|
||||
compactSegmentsDuty.run();
|
||||
}
|
||||
|
||||
|
@ -598,14 +605,14 @@ public class DruidCoordinator
|
|||
final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = new ArrayList<>();
|
||||
dutiesRunnables.add(
|
||||
Pair.of(
|
||||
new DutiesRunnable(makeHistoricalManagementDuties(), startingLeaderCounter),
|
||||
new DutiesRunnable(makeHistoricalManagementDuties(), startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
|
||||
config.getCoordinatorPeriod()
|
||||
)
|
||||
);
|
||||
if (indexingServiceClient != null) {
|
||||
dutiesRunnables.add(
|
||||
Pair.of(
|
||||
new DutiesRunnable(makeIndexingServiceDuties(), startingLeaderCounter),
|
||||
new DutiesRunnable(makeIndexingServiceDuties(), startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
|
||||
config.getCoordinatorIndexingPeriod()
|
||||
)
|
||||
);
|
||||
|
@ -706,11 +713,13 @@ public class DruidCoordinator
|
|||
private final long startTimeNanos = System.nanoTime();
|
||||
private final List<CoordinatorDuty> duties;
|
||||
private final int startingLeaderCounter;
|
||||
private final String dutiesRunnableAlias;
|
||||
|
||||
protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
|
||||
protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter, String alias)
|
||||
{
|
||||
this.duties = duties;
|
||||
this.startingLeaderCounter = startingLeaderCounter;
|
||||
this.dutiesRunnableAlias = alias;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -747,6 +756,7 @@ public class DruidCoordinator
|
|||
public void run()
|
||||
{
|
||||
try {
|
||||
final long globalStart = System.nanoTime();
|
||||
synchronized (lock) {
|
||||
if (!coordLeaderSelector.isLeader()) {
|
||||
log.info("LEGGO MY EGGO. [%s] is leader.", coordLeaderSelector.getCurrentLeader());
|
||||
|
@ -801,14 +811,24 @@ public class DruidCoordinator
|
|||
&& coordLeaderSelector.isLeader()
|
||||
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
|
||||
|
||||
final long start = System.nanoTime();
|
||||
params = duty.run(params);
|
||||
final long end = System.nanoTime();
|
||||
|
||||
if (params == null) {
|
||||
// This duty wanted to cancel the run. No log message, since the duty should have logged a reason.
|
||||
return;
|
||||
} else {
|
||||
params.getCoordinatorStats().addToDutyStat("runtime", duty.getClass().getName(), TimeUnit.NANOSECONDS.toMillis(end - start));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Emit the runtime of the full DutiesRunnable
|
||||
params.getEmitter().emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setDimension(DruidMetrics.DUTY_GROUP, dutiesRunnableAlias)
|
||||
.build("coordinator/global/time", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - globalStart))
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
|
||||
|
|
|
@ -97,6 +97,35 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
|
|||
);
|
||||
}
|
||||
|
||||
private void emitDutyStat(
|
||||
final ServiceEmitter emitter,
|
||||
final String metricName,
|
||||
final String duty,
|
||||
final long value
|
||||
)
|
||||
{
|
||||
emitter.emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setDimension(DruidMetrics.DUTY, duty)
|
||||
.build(metricName, value)
|
||||
);
|
||||
}
|
||||
|
||||
private void emitDutyStats(
|
||||
final ServiceEmitter emitter,
|
||||
final String metricName,
|
||||
final CoordinatorStats stats,
|
||||
final String statName
|
||||
)
|
||||
{
|
||||
stats.forEachDutyStat(
|
||||
statName,
|
||||
(final String duty, final long count) -> {
|
||||
emitDutyStat(emitter, metricName, duty, count);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
|
@ -435,6 +464,9 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
|
|||
}
|
||||
);
|
||||
|
||||
// Emit coordinator runtime stats
|
||||
emitDutyStats(emitter, "coordinator/time", stats, "runtime");
|
||||
|
||||
return params;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,12 +116,18 @@ public class CoordinatorStatsTest
|
|||
stats.addToTieredStat("stat1", "tier1", 1);
|
||||
stats.addToTieredStat("stat1", "tier2", 1);
|
||||
stats.addToTieredStat("stat2", "tier1", 1);
|
||||
stats.addToDutyStat("stat1", "duty1", 1);
|
||||
stats.addToDutyStat("stat1", "duty2", 1);
|
||||
stats.addToDutyStat("stat2", "duty1", 1);
|
||||
|
||||
final CoordinatorStats stats2 = new CoordinatorStats();
|
||||
stats2.addToGlobalStat("stat1", 1);
|
||||
stats2.addToTieredStat("stat1", "tier2", 1);
|
||||
stats2.addToTieredStat("stat2", "tier2", 1);
|
||||
stats2.addToTieredStat("stat3", "tier1", 1);
|
||||
stats2.addToDutyStat("stat1", "duty2", 1);
|
||||
stats2.addToDutyStat("stat2", "duty2", 1);
|
||||
stats2.addToDutyStat("stat3", "duty1", 1);
|
||||
|
||||
stats.accumulate(stats2);
|
||||
|
||||
|
@ -132,6 +138,11 @@ public class CoordinatorStatsTest
|
|||
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1"));
|
||||
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2"));
|
||||
Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1"));
|
||||
Assert.assertEquals(1, stats.getDutyStat("stat1", "duty1"));
|
||||
Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2"));
|
||||
Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1"));
|
||||
Assert.assertEquals(1, stats.getDutyStat("stat2", "duty2"));
|
||||
Assert.assertEquals(1, stats.getDutyStat("stat3", "duty1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -167,4 +178,56 @@ public class CoordinatorStatsTest
|
|||
Assert.assertEquals(10, stats.getTieredStat("stat1", "tier2"));
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void testGetNonexistentDutyStat()
|
||||
{
|
||||
stats.getDutyStat("stat", "duty");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddToDutyStat()
|
||||
{
|
||||
Assert.assertFalse(stats.hasPerDutyStats());
|
||||
stats.addToDutyStat("stat1", "duty1", 1);
|
||||
stats.addToDutyStat("stat1", "duty2", 1);
|
||||
stats.addToDutyStat("stat1", "duty1", -5);
|
||||
stats.addToDutyStat("stat2", "duty1", 1);
|
||||
stats.addToDutyStat("stat1", "duty2", 1);
|
||||
Assert.assertTrue(stats.hasPerDutyStats());
|
||||
|
||||
Assert.assertEquals(
|
||||
Sets.newHashSet("duty1", "duty2"),
|
||||
stats.getDuties("stat1")
|
||||
);
|
||||
Assert.assertEquals(
|
||||
Sets.newHashSet("duty1"),
|
||||
stats.getDuties("stat2")
|
||||
);
|
||||
Assert.assertTrue(stats.getDuties("stat3").isEmpty());
|
||||
|
||||
Assert.assertEquals(-4, stats.getDutyStat("stat1", "duty1"));
|
||||
Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2"));
|
||||
Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForEachDutyStat()
|
||||
{
|
||||
final Map<String, Long> expected = ImmutableMap.of(
|
||||
"duty1", 1L,
|
||||
"duty2", 2L,
|
||||
"duty3", 3L
|
||||
);
|
||||
final Map<String, Long> actual = new HashMap<>();
|
||||
|
||||
expected.forEach(
|
||||
(duty, count) -> stats.addToDutyStat("stat", duty, count)
|
||||
);
|
||||
|
||||
stats.forEachDutyStat("stat0", (duty, count) -> Assert.fail());
|
||||
stats.forEachDutyStat("stat", actual::put);
|
||||
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -709,7 +709,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
|
|||
ZkEnablementConfig.ENABLED
|
||||
);
|
||||
|
||||
DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0);
|
||||
DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0, "TEST");
|
||||
// before initialization
|
||||
Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
|
||||
Assert.assertNull(c.getBalancerExec());
|
||||
|
|
Loading…
Reference in New Issue