Improve alert message for segment assignments (#14696)

Changes:
- Add interface `SegmentDeleteHandler` for marking segments as unused
- In `StrategicSegmentAssigner`, collect all segments on which a drop rule applies in a list
- Process the list above as a batch delete rather than individual deletes
- Improve alert messages when an invalid tier is specified in a load rule
- Improve alert message when no rule applies on a segment
This commit is contained in:
Kashif Faraz 2023-08-01 23:33:05 +05:30 committed by GitHub
parent 153948198c
commit ee4e0c93b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 198 additions and 98 deletions

View File

@ -338,13 +338,6 @@ public class DruidCoordinator
return CoordinatorCompactionConfig.current(configManager);
}
public void markSegmentsAsUnused(String datasource, Set<SegmentId> segmentIds)
{
log.debug("Marking [%d] segments of datasource [%s] as unused.", segmentIds.size(), datasource);
int updatedCount = segmentsMetadataManager.markSegmentsAsUnused(segmentIds);
log.info("Successfully marked [%d] segments of datasource [%s] as unused.", updatedCount, datasource);
}
public String getCurrentLeader()
{
return coordLeaderSelector.getCurrentLeader();
@ -579,10 +572,10 @@ public class DruidCoordinator
{
return ImmutableList.of(
new UpdateCoordinatorStateAndPrepareCluster(),
new RunRules(),
new RunRules(segmentsMetadataManager::markSegmentsAsUnused),
new UpdateReplicationStatus(),
new UnloadUnusedSegments(loadQueueManager),
new MarkOvershadowedSegmentsAsUnused(DruidCoordinator.this),
new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
new BalanceSegments(),
new CollectSegmentAndServerStats(DruidCoordinator.this)
);

View File

@ -27,7 +27,6 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
@ -60,9 +59,6 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
.forEach(this::logHistoricalTierStats);
collectSegmentStats(params);
StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
segmentAssigner.makeAlerts();
return params;
}

View File

@ -24,7 +24,6 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@ -42,15 +41,21 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Marks segments that are overshadowed by currently served segments as unused.
* This duty runs only if the Coordinator has been running long enough to have a
* refreshed metadata view. This duration is controlled by the dynamic config
* {@code millisToWaitBeforeDeleting}.
*/
public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
{
private static final Logger log = new Logger(MarkOvershadowedSegmentsAsUnused.class);
private final DruidCoordinator coordinator;
private final SegmentDeleteHandler deleteHandler;
public MarkOvershadowedSegmentsAsUnused(DruidCoordinator coordinator)
public MarkOvershadowedSegmentsAsUnused(SegmentDeleteHandler deleteHandler)
{
this.coordinator = coordinator;
this.deleteHandler = deleteHandler;
}
@Override
@ -104,7 +109,9 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
(datasource, unusedSegments) -> {
RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource);
stats.add(Stats.Segments.OVERSHADOWED, datasourceKey, unusedSegments.size());
coordinator.markSegmentsAsUnused(datasource, unusedSegments);
int updatedCount = deleteHandler.markSegmentsAsUnused(unusedSegments);
log.info("Successfully marked [%d] segments of datasource[%s] as unused.", updatedCount, datasource);
}
);

View File

@ -19,9 +19,10 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.DruidCluster;
@ -29,8 +30,11 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import java.util.List;
@ -38,7 +42,9 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
* Duty to run retention rules.
* Duty to run retention rules for all used non-overshadowed segments.
* Overshadowed segments are marked unused by {@link MarkOvershadowedSegmentsAsUnused}
* duty and are eventually unloaded from all servers by {@link UnloadUnusedSegments}.
* <p>
* The params returned from {@code run()} must have these fields initialized:
* <ul>
@ -49,7 +55,13 @@ import java.util.stream.Collectors;
public class RunRules implements CoordinatorDuty
{
private static final EmittingLogger log = new EmittingLogger(RunRules.class);
private static final int MAX_MISSING_RULES = 10;
private final SegmentDeleteHandler deleteHandler;
public RunRules(SegmentDeleteHandler deleteHandler)
{
this.deleteHandler = deleteHandler;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
@ -60,10 +72,6 @@ public class RunRules implements CoordinatorDuty
return params;
}
// Get used segments which are overshadowed by other used segments. Those would not need to be loaded and
// eventually will be unloaded from Historical servers. Segments overshadowed by *served* used segments are marked
// as unused in MarkAsUnusedOvershadowedSegments, and then eventually Coordinator sends commands to Historical nodes
// to unload such segments in UnloadUnusedSegments.
final Set<DataSegment> overshadowed = params.getDataSourcesSnapshot().getOvershadowedSegments();
final Set<DataSegment> usedSegments = params.getUsedSegments();
log.info(
@ -74,16 +82,16 @@ public class RunRules implements CoordinatorDuty
final StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
final MetadataRuleManager databaseRuleManager = params.getDatabaseRuleManager();
int missingRules = 0;
final DateTime now = DateTimes.nowUtc();
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
// Run through all matched rules for used segments
final Object2IntOpenHashMap<String> datasourceToSegmentsWithNoRule = new Object2IntOpenHashMap<>();
for (DataSegment segment : usedSegments) {
// Do not apply rules on overshadowed segments as they will be
// marked unused and eventually unloaded from all historicals
if (overshadowed.contains(segment)) {
// Skip overshadowed segments
continue;
}
// Find and apply matching rule
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
@ -95,25 +103,59 @@ public class RunRules implements CoordinatorDuty
}
if (!foundMatchingRule) {
if (segmentsWithMissingRules.size() < MAX_MISSING_RULES) {
segmentsWithMissingRules.add(segment.getId());
}
missingRules++;
datasourceToSegmentsWithNoRule.addTo(segment.getDataSource(), 1);
}
}
if (!segmentsWithMissingRules.isEmpty()) {
log.makeAlert("Unable to find matching rules!")
.addData("segmentsWithMissingRulesCount", missingRules)
.addData("segmentsWithMissingRules", segmentsWithMissingRules)
.emit();
}
processSegmentDeletes(segmentAssigner, params.getCoordinatorStats());
alertForSegmentsWithNoRules(datasourceToSegmentsWithNoRule);
alertForInvalidRules(segmentAssigner);
return params.buildFromExisting()
.withBroadcastDatasources(getBroadcastDatasources(params))
.build();
}
private void processSegmentDeletes(
StrategicSegmentAssigner segmentAssigner,
CoordinatorRunStats runStats
)
{
segmentAssigner.getSegmentsToDelete().forEach((datasource, segmentIds) -> {
final Stopwatch stopwatch = Stopwatch.createStarted();
int numUpdatedSegments = deleteHandler.markSegmentsAsUnused(segmentIds);
RowKey rowKey = RowKey.of(Dimension.DATASOURCE, datasource);
runStats.add(Stats.Segments.DELETED, rowKey, numUpdatedSegments);
log.info(
"Successfully marked [%d] segments of datasource[%s] as unused in [%d]ms.",
numUpdatedSegments, datasource, stopwatch.millisElapsed()
);
});
}
private void alertForSegmentsWithNoRules(Object2IntOpenHashMap<String> datasourceToSegmentsWithNoRule)
{
datasourceToSegmentsWithNoRule.object2IntEntrySet().fastForEach(
entry -> log.noStackTrace().makeAlert(
"No matching retention rule for [%d] segments in datasource[%s]",
entry.getIntValue(), entry.getKey()
).emit()
);
}
private void alertForInvalidRules(StrategicSegmentAssigner segmentAssigner)
{
segmentAssigner.getDatasourceToInvalidLoadTiers().forEach(
(datasource, invalidTiers) -> log.makeAlert(
"Load rules for datasource[%s] refer to invalid tiers[%s]."
+ " Update the load rules or add servers for these tiers.",
datasource, invalidTiers
).emit()
);
}
private Set<String> getBroadcastDatasources(DruidCoordinatorRuntimeParams params)
{
final Set<String> broadcastDatasources =

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.timeline.SegmentId;
import java.util.Set;
public interface SegmentDeleteHandler
{
int markSegmentsAsUnused(Set<SegmentId> segmentIds);
}

View File

@ -33,7 +33,9 @@ import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@ -51,6 +53,7 @@ import java.util.stream.Collectors;
* <p>
* An instance of this class is freshly created for each coordinator run.
*/
@NotThreadSafe
public class StrategicSegmentAssigner implements SegmentActionHandler
{
private static final EmittingLogger log = new EmittingLogger(StrategicSegmentAssigner.class);
@ -65,8 +68,9 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
private final boolean useRoundRobinAssignment;
private final Set<String> tiersWithNoServer = new HashSet<>();
private final Map<String, Set<String>> datasourceToInvalidLoadTiers = new HashMap<>();
private final Map<String, Integer> tierToHistoricalCount = new HashMap<>();
private final Map<String, Set<SegmentId>> segmentsToDelete = new HashMap<>();
public StrategicSegmentAssigner(
SegmentLoadQueueManager loadQueueManager,
@ -100,11 +104,14 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
return replicaCountMap.toReplicationStatus();
}
public void makeAlerts()
public Map<String, Set<SegmentId>> getSegmentsToDelete()
{
if (!tiersWithNoServer.isEmpty()) {
log.makeAlert("Tiers [%s] have no servers! Check your cluster configuration.", tiersWithNoServer).emit();
}
return segmentsToDelete;
}
public Map<String, Set<String>> getDatasourceToInvalidLoadTiers()
{
return datasourceToInvalidLoadTiers;
}
/**
@ -197,7 +204,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
{
final Set<String> allTiersInCluster = Sets.newHashSet(cluster.getTierNames());
if (tierToReplicaCount == null || tierToReplicaCount.isEmpty()) {
if (tierToReplicaCount.isEmpty()) {
// Track the counts for a segment even if it requires 0 replicas on all tiers
replicaCountMap.computeIfAbsent(segment.getId(), DruidServer.DEFAULT_TIER);
} else {
@ -209,7 +216,8 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
replicaCount.setRequired(requiredReplicas, tierToHistoricalCount.getOrDefault(tier, 0));
if (!allTiersInCluster.contains(tier)) {
tiersWithNoServer.add(tier);
datasourceToInvalidLoadTiers.computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>())
.add(tier);
}
});
}
@ -349,9 +357,9 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
@Override
public void deleteSegment(DataSegment segment)
{
loadQueueManager.deleteSegment(segment);
RowKey rowKey = RowKey.of(Dimension.DATASOURCE, segment.getDataSource());
stats.add(Stats.Segments.DELETED, rowKey, 1);
segmentsToDelete
.computeIfAbsent(segment.getDataSource(), ds -> new HashSet<>())
.add(segment.getId());
}
/**

View File

@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
/**
* TODO convert benchmarks to JMH
@ -143,7 +144,7 @@ public class BalanceSegmentsProfiler
.build();
BalanceSegments tester = new BalanceSegments();
RunRules runner = new RunRules();
RunRules runner = new RunRules(Set::size);
watch.start();
DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
DruidCoordinatorRuntimeParams assignParams = runner.run(params);

View File

@ -19,40 +19,41 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.simulate.TestSegmentsMetadataManager;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@RunWith(JUnitParamsRunner.class)
public class MarkOvershadowedSegmentsAsUnusedTest
{
private final DruidCoordinator coordinator = EasyMock.createStrictMock(DruidCoordinator.class);
private final DateTime start = DateTimes.of("2012-01-01");
private final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
private final ImmutableDruidDataSource druidDataSource = EasyMock.createMock(ImmutableDruidDataSource.class);
private final DataSegment segmentV0 = DataSegment.builder().dataSource("test")
.interval(new Interval(start, start.plusHours(1)))
.version("0")
@ -61,44 +62,65 @@ public class MarkOvershadowedSegmentsAsUnusedTest
private final DataSegment segmentV1 = segmentV0.withVersion("1");
private final DataSegment segmentV2 = segmentV0.withVersion("2");
private TestSegmentsMetadataManager segmentsMetadataManager;
@Before
public void setup()
{
segmentsMetadataManager = new TestSegmentsMetadataManager();
}
@Test
@Parameters({"historical", "broker"})
public void testRun(String serverTypeString)
public void testRun(String serverType)
{
ServerType serverType = ServerType.fromString(serverTypeString);
segmentsMetadataManager.addSegment(segmentV0);
segmentsMetadataManager.addSegment(segmentV1);
segmentsMetadataManager.addSegment(segmentV2);
MarkOvershadowedSegmentsAsUnused markOvershadowedSegmentsAsUnused =
new MarkOvershadowedSegmentsAsUnused(coordinator);
final List<DataSegment> usedSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2);
// Dummy values for comparisons in TreeSet
EasyMock.expect(mockPeon.getSegmentsInQueue())
.andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop())
.andReturn(Collections.emptySet()).anyTimes();
final ImmutableDruidServer druidServer = new DruidServer("", "", "", 0L, serverType, "", 0)
.addDataSegment(segmentV1)
.addDataSegment(segmentV2)
.toImmutableDruidServer();
coordinator.markSegmentsAsUnused("test", ImmutableSet.of(segmentV1.getId(), segmentV0.getId()));
EasyMock.expectLastCall();
EasyMock.replay(mockPeon, coordinator, druidDataSource);
final ImmutableDruidServer druidServer =
new DruidServer("", "", "", 0L, ServerType.fromString(serverType), "", 0)
.addDataSegment(segmentV1)
.addDataSegment(segmentV2)
.toImmutableDruidServer();
DruidCluster druidCluster = DruidCluster
.builder()
.addTier("normal", new ServerHolder(druidServer, mockPeon))
.add(new ServerHolder(druidServer, new TestLoadQueuePeon()))
.build();
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withUsedSegmentsInTest(usedSegments)
.withSnapshotOfDataSourcesWithAllUsedSegments(
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
)
.withDruidCluster(druidCluster)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build()
)
.withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null))
.build();
markOvershadowedSegmentsAsUnused.run(params);
EasyMock.verify(coordinator, druidDataSource);
SegmentTimeline timeline = segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
.getUsedSegmentsTimelinesPerDataSource()
.get("test");
// Verify that the segments V0 and V1 are overshadowed
Assert.assertTrue(timeline.isOvershadowed(segmentV0));
Assert.assertTrue(timeline.isOvershadowed(segmentV1));
// Run the duty and verify that the overshadowed segments are marked unused
params = new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused).run(params);
Set<DataSegment> updatedUsedSegments = Sets.newHashSet(segmentsMetadataManager.iterateAllUsedSegments());
Assert.assertEquals(1, updatedUsedSegments.size());
Assert.assertTrue(updatedUsedSegments.contains(segmentV2));
CoordinatorRunStats runStats = params.getCoordinatorStats();
Assert.assertEquals(
2L,
runStats.get(Stats.Segments.OVERSHADOWED, RowKey.of(Dimension.DATASOURCE, "test"))
);
}
}

View File

@ -29,7 +29,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
@ -69,7 +68,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -104,7 +102,7 @@ public class RunRulesTest
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
ruleRunner = new RunRules();
ruleRunner = new RunRules(Set::size);
loadQueueManager = new SegmentLoadQueueManager(null, segmentsMetadataManager, null);
balancerExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
}
@ -511,20 +509,22 @@ public class RunRulesTest
.add(createServerHolder("serverNorm", "normal", mockPeon))
.build();
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster).build();
DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster)
.withBalancerStrategy(new CostBalancerStrategy(balancerExecutor))
.withSegmentAssignerUsing(loadQueueManager)
.build();
runDutyAndGetStats(params);
final List<Event> events = emitter.getEvents();
final List<AlertEvent> events = emitter.getAlerts();
Assert.assertEquals(1, events.size());
AlertEvent alertEvent = (AlertEvent) events.get(0);
AlertEvent alertEvent = events.get(0);
EventMap eventMap = alertEvent.toMap();
Assert.assertEquals("Unable to find matching rules!", eventMap.get("description"));
Map<String, Object> dataMap = alertEvent.getDataMap();
Assert.assertEquals(usedSegments.size(), dataMap.get("segmentsWithMissingRulesCount"));
Assert.assertEquals(
"No matching retention rule for [24] segments in datasource[test]",
eventMap.get("description")
);
EasyMock.verify(mockPeon);
}