diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 8c060ee321f..6b1e29d4916 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -169,7 +169,7 @@ public class DruidCoordinator private int cachedBalancerThreadNumber; private ListeningExecutorService balancerExec; - private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; + public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties"; private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties"; private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties"; @@ -765,8 +765,7 @@ public class DruidCoordinator new RunRules(DruidCoordinator.this), new UnloadUnusedSegments(), new MarkAsUnusedOvershadowedSegments(DruidCoordinator.this), - new BalanceSegments(DruidCoordinator.this), - new EmitClusterStatsAndMetrics(DruidCoordinator.this) + new BalanceSegments(DruidCoordinator.this) ); } @@ -841,7 +840,17 @@ public class DruidCoordinator protected DutiesRunnable(List duties, final int startingLeaderCounter, String alias) { - this.duties = duties; + // Automatically add EmitClusterStatsAndMetrics duty to the group if it does not already exists + // This is to avoid human coding error (forgetting to add the EmitClusterStatsAndMetrics duty to the group) + // causing metrics from the duties to not being emitted. + if (duties.stream().noneMatch(duty -> duty instanceof EmitClusterStatsAndMetrics)) { + boolean isContainCompactSegmentDuty = duties.stream().anyMatch(duty -> duty instanceof CompactSegments); + List allDuties = new ArrayList<>(duties); + allDuties.add(new EmitClusterStatsAndMetrics(DruidCoordinator.this, alias, isContainCompactSegmentDuty)); + this.duties = allDuties; + } else { + this.duties = duties; + } this.startingLeaderCounter = startingLeaderCounter; this.dutiesRunnableAlias = alias; } @@ -958,6 +967,12 @@ public class DruidCoordinator log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); } } + + @VisibleForTesting + public List getDuties() + { + return duties; + } } /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java index 1f6cab3ec5b..71a18ce3948 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java @@ -48,10 +48,14 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty public static final String MAX_REPLICATION_FACTOR = "maxReplicationFactor"; private final DruidCoordinator coordinator; + private final String groupName; + private final boolean isContainCompactSegmentDuty; - public EmitClusterStatsAndMetrics(DruidCoordinator coordinator) + public EmitClusterStatsAndMetrics(DruidCoordinator coordinator, String groupName, boolean isContainCompactSegmentDuty) { this.coordinator = coordinator; + this.groupName = groupName; + this.isContainCompactSegmentDuty = isContainCompactSegmentDuty; } private void emitTieredStat( @@ -64,6 +68,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty emitter.emit( new ServiceMetricEvent.Builder() .setDimension(DruidMetrics.TIER, tier) + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .build(metricName, value) ); } @@ -78,6 +83,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty emitter.emit( new ServiceMetricEvent.Builder() .setDimension(DruidMetrics.TIER, tier) + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .build(metricName, value) ); } @@ -107,6 +113,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty emitter.emit( new ServiceMetricEvent.Builder() .setDimension(DruidMetrics.DUTY, duty) + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .build(metricName, value) ); } @@ -133,6 +140,21 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty CoordinatorStats stats = params.getCoordinatorStats(); ServiceEmitter emitter = params.getEmitter(); + if (DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP.equals(groupName)) { + emitStatsForHistoricalManagementDuties(cluster, stats, emitter, params); + } + if (isContainCompactSegmentDuty) { + emitStatsForCompactSegments(cluster, stats, emitter); + } + + // Emit coordinator runtime stats + emitDutyStats(emitter, "coordinator/time", stats, "runtime"); + + return params; + } + + private void emitStatsForHistoricalManagementDuties(DruidCluster cluster, CoordinatorStats stats, ServiceEmitter emitter, DruidCoordinatorRuntimeParams params) + { stats.forEachTieredStat( "assignedCount", (final String tier, final long count) -> { @@ -190,7 +212,9 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty ); emitter.emit( - new ServiceMetricEvent.Builder().build( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .build( "segment/overShadowed/count", stats.getGlobalStat("overShadowedCount") ) @@ -269,24 +293,28 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty .forEach((final String serverName, final LoadQueuePeon queuePeon) -> { emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.SERVER, serverName).build( "segment/loadQueue/size", queuePeon.getLoadQueueSize() ) ); emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.SERVER, serverName).build( "segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() ) ); emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.SERVER, serverName).build( "segment/loadQueue/count", queuePeon.getSegmentsToLoad().size() ) ); emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.SERVER, serverName).build( "segment/dropQueue/count", queuePeon.getSegmentsToDrop().size() ) @@ -299,6 +327,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty final int numUnavailableUsedSegmentsInDataSource = entry.getIntValue(); emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.DATASOURCE, dataSource).build( "segment/unavailable/count", numUnavailableUsedSegmentsInDataSource ) @@ -314,6 +343,7 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.TIER, tier) .setDimension(DruidMetrics.DATASOURCE, dataSource).build( "segment/underReplicated/count", underReplicationCount @@ -323,126 +353,6 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty } ); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "compact/task/count", - stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT) - ) - ); - - emitter.emit( - new ServiceMetricEvent.Builder().build( - "compactTask/maxSlot/count", - stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT) - ) - ); - - emitter.emit( - new ServiceMetricEvent.Builder().build( - "compactTask/availableSlot/count", - stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT) - ) - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("segment/waitCompact/bytes", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("segment/waitCompact/count", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("interval/waitCompact/count", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_SIZE_OF_SEGMENTS_SKIPPED, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("segment/skipCompact/bytes", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_COUNT_OF_SEGMENTS_SKIPPED, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("segment/skipCompact/count", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("interval/skipCompact/count", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("segment/compacted/bytes", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("segment/compacted/count", count) - ); - } - ); - - stats.forEachDataSourceStat( - CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED, - (final String dataSource, final long count) -> { - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .build("interval/compacted/count", count) - ); - } - ); - // Emit segment metrics params.getUsedSegmentsTimelinesPerDataSource().forEach( (String dataSource, VersionedIntervalTimeline dataSourceWithUsedSegments) -> { @@ -453,20 +363,155 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty .sum(); emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.DATASOURCE, dataSource) .build("segment/size", totalSizeOfUsedSegments) ); emitter.emit( new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) .setDimension(DruidMetrics.DATASOURCE, dataSource) .build("segment/count", dataSourceWithUsedSegments.getNumObjects()) ); } ); + } - // Emit coordinator runtime stats - emitDutyStats(emitter, "coordinator/time", stats, "runtime"); + private void emitStatsForCompactSegments(DruidCluster cluster, CoordinatorStats stats, ServiceEmitter emitter) + { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .build( + "compact/task/count", + stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT) + ) + ); - return params; + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .build( + "compactTask/maxSlot/count", + stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT) + ) + ); + + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .build( + "compactTask/availableSlot/count", + stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT) + ) + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("segment/waitCompact/bytes", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("segment/waitCompact/count", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("interval/waitCompact/count", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_SIZE_OF_SEGMENTS_SKIPPED, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("segment/skipCompact/bytes", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_COUNT_OF_SEGMENTS_SKIPPED, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("segment/skipCompact/count", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("interval/skipCompact/count", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("segment/compacted/bytes", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("segment/compacted/count", count) + ); + } + ); + + stats.forEachDataSourceStat( + CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED, + (final String dataSource, final long count) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DUTY_GROUP, groupName) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .build("interval/compacted/count", count) + ); + } + ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 036374828e6..7a12a7a2bbd 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -57,7 +57,9 @@ import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; +import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics; import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty; +import org.apache.druid.server.coordinator.duty.LogUsedSegments; import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; @@ -1133,6 +1135,34 @@ public class DruidCoordinatorTest extends CuratorTestBase latch2.await(); } + @Test + public void testEmitClusterStatsAndMetricsAddedWhenNotInDutyList() + { + DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new DutiesRunnable(ImmutableList.of(new LogUsedSegments()), 0, "TEST"); + List duties = dutyRunnable.getDuties(); + int emitDutyFound = 0; + for (CoordinatorDuty duty : duties) { + if (duty instanceof EmitClusterStatsAndMetrics) { + emitDutyFound++; + } + } + Assert.assertEquals(1, emitDutyFound); + } + + @Test + public void testEmitClusterStatsAndMetricsNotAddedAgainWhenInDutyList() + { + DruidCoordinator.DutiesRunnable dutyRunnable = coordinator.new DutiesRunnable(ImmutableList.of(new LogUsedSegments(), new EmitClusterStatsAndMetrics(coordinator, "TEST", false)), 0, "TEST"); + List duties = dutyRunnable.getDuties(); + int emitDutyFound = 0; + for (CoordinatorDuty duty : duties) { + if (duty instanceof EmitClusterStatsAndMetrics) { + emitDutyFound++; + } + } + Assert.assertEquals(1, emitDutyFound); + } + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount, PathChildrenCache pathChildrenCache, Map segments, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java new file mode 100644 index 00000000000..00061b1f75b --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetricsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableMap; +import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.server.coordinator.CoordinatorStats; +import org.apache.druid.server.coordinator.DruidCluster; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class EmitClusterStatsAndMetricsTest +{ + @Mock + private ServiceEmitter mockServiceEmitter; + @Mock + private DruidCoordinator mockDruidCoordinator; + @Mock + private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; + @Mock + CoordinatorStats mockCoordinatorStats; + @Mock + DruidCluster mockDruidCluster; + @Mock + MetadataRuleManager mockMetadataRuleManager; + + @Test + public void testRunOnlyEmitStatsForHistoricalDuties() + { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class); + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(mockCoordinatorStats); + Mockito.when(mockDruidCoordinatorRuntimeParams.getDruidCluster()).thenReturn(mockDruidCluster); + Mockito.when(mockDruidCoordinatorRuntimeParams.getDatabaseRuleManager()).thenReturn(mockMetadataRuleManager); + Mockito.when(mockDruidCoordinator.computeNumsUnavailableUsedSegmentsPerDataSource()).thenReturn(Object2IntMaps.emptyMap()); + Mockito.when(mockDruidCoordinator.computeUnderReplicationCountsPerDataSourcePerTier()).thenReturn(ImmutableMap.of()); + CoordinatorDuty duty = new EmitClusterStatsAndMetrics(mockDruidCoordinator, DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, false); + duty.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockServiceEmitter, Mockito.atLeastOnce()).emit(argumentCaptor.capture()); + List emittedEvents = argumentCaptor.getAllValues(); + boolean foundCompactMetric = false; + boolean foundHistoricalDutyMetric = false; + for (ServiceEventBuilder eventBuilder : emittedEvents) { + ServiceMetricEvent serviceMetricEvent = ((ServiceMetricEvent) eventBuilder.build("x", "x")); + String metric = serviceMetricEvent.getMetric(); + if ("segment/overShadowed/count".equals(metric)) { + foundHistoricalDutyMetric = true; + } else if ("compact/task/count".equals(metric)) { + foundCompactMetric = true; + } + String dutyGroup = (String) serviceMetricEvent.getUserDims().get("dutyGroup"); + Assert.assertNotNull(dutyGroup); + Assert.assertEquals(DruidCoordinator.HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP, dutyGroup); + } + Assert.assertTrue(foundHistoricalDutyMetric); + Assert.assertFalse(foundCompactMetric); + } + + @Test + public void testRunEmitStatsForCompactionWhenHaveCompactSegmentDuty() + { + String groupName = "blah"; + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(ServiceEventBuilder.class); + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(mockCoordinatorStats); + Mockito.when(mockDruidCoordinatorRuntimeParams.getDruidCluster()).thenReturn(mockDruidCluster); + CoordinatorDuty duty = new EmitClusterStatsAndMetrics(mockDruidCoordinator, groupName, true); + duty.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockServiceEmitter, Mockito.atLeastOnce()).emit(argumentCaptor.capture()); + List emittedEvents = argumentCaptor.getAllValues(); + boolean foundCompactMetric = false; + boolean foundHistoricalDutyMetric = false; + for (ServiceEventBuilder eventBuilder : emittedEvents) { + ServiceMetricEvent serviceMetricEvent = ((ServiceMetricEvent) eventBuilder.build("x", "x")); + String metric = serviceMetricEvent.getMetric(); + if ("segment/overShadowed/count".equals(metric)) { + foundHistoricalDutyMetric = true; + } else if ("compact/task/count".equals(metric)) { + foundCompactMetric = true; + } + String dutyGroup = (String) serviceMetricEvent.getUserDims().get("dutyGroup"); + Assert.assertNotNull(dutyGroup); + Assert.assertEquals(groupName, dutyGroup); + } + Assert.assertFalse(foundHistoricalDutyMetric); + Assert.assertTrue(foundCompactMetric); + } +}