Fix Flakiness in KafkaEmitterTest (#15907)

* thrust of the fix to allow for the json values to be out of order

The existing problem is that toMap doesn't turn some values into json primitive
values, for example segmentMetadata just has DateTime objects for it's time in
the EventMap, but Alert event converts those into strings when calling toMap.
This creates an issue because when we check the emitted events the mapper
deserializing the string value for dateTime leaves it as a string in the
EventMap. So the question is do we alter the events toMap() to return string/map
version of objects or to make the expected events do a round trip of
eventMap -> string -> eventMap to turn everything into json primitives

* fix issue by making toMap events convert Objects into strings, or maps

* fix linting errors

* use method of using mapper to round trip expected data to make it have same type
as those of the events emitted

* remove unnecessary comment
This commit is contained in:
Tom 2024-02-14 20:31:55 -08:00 committed by GitHub
parent c98d54f3c4
commit f224035c7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 41 additions and 39 deletions

View File

@ -54,6 +54,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -135,7 +137,7 @@ public class KafkaEmitterTest
* events are emitted without any drops. * events are emitted without any drops.
*/ */
@Test(timeout = 10_000) @Test(timeout = 10_000)
public void testServiceMetricEvents() throws JsonProcessingException, InterruptedException public void testServiceMetricEvents() throws InterruptedException, JsonProcessingException
{ {
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"", "",
@ -155,12 +157,12 @@ public class KafkaEmitterTest
final List<Event> inputEvents = flattenEvents(SERVICE_METRIC_EVENTS); final List<Event> inputEvents = flattenEvents(SERVICE_METRIC_EVENTS);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size()); final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents, inputEvents,
kafkaEmitterConfig.getClusterName(), kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions() kafkaEmitterConfig.getExtraDimensions()
); );
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -180,7 +182,7 @@ public class KafkaEmitterTest
* events are emitted without any drops. * events are emitted without any drops.
*/ */
@Test(timeout = 10_000) @Test(timeout = 10_000)
public void testAllEvents() throws JsonProcessingException, InterruptedException public void testAllEvents() throws InterruptedException, JsonProcessingException
{ {
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"", "",
@ -205,12 +207,12 @@ public class KafkaEmitterTest
); );
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size()); final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents, inputEvents,
kafkaEmitterConfig.getClusterName(), kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions() kafkaEmitterConfig.getExtraDimensions()
); );
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -230,7 +232,7 @@ public class KafkaEmitterTest
* event types should be emitted without any drops. * event types should be emitted without any drops.
*/ */
@Test(timeout = 10_000) @Test(timeout = 10_000)
public void testDefaultEvents() throws JsonProcessingException, InterruptedException public void testDefaultEvents() throws InterruptedException, JsonProcessingException
{ {
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"", "",
@ -253,12 +255,12 @@ public class KafkaEmitterTest
); );
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size()); final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents, inputEvents,
kafkaEmitterConfig.getClusterName(), kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions() kafkaEmitterConfig.getExtraDimensions()
); );
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -278,7 +280,7 @@ public class KafkaEmitterTest
* should be emitted, and everything else should be dropped. * should be emitted, and everything else should be dropped.
*/ */
@Test(timeout = 10_000) @Test(timeout = 10_000)
public void testAlertsPlusUnsubscribedEvents() throws JsonProcessingException, InterruptedException public void testAlertsPlusUnsubscribedEvents() throws InterruptedException, JsonProcessingException
{ {
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"", "",
@ -305,12 +307,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(ALERT_EVENTS.size()); final CountDownLatch eventLatch = new CountDownLatch(ALERT_EVENTS.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToExpectedEvents = trackExpectedEventsPerFeed(
ALERT_EVENTS, ALERT_EVENTS,
kafkaEmitterConfig.getClusterName(), kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions() kafkaEmitterConfig.getExtraDimensions()
); );
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -335,7 +337,7 @@ public class KafkaEmitterTest
* </p> * </p>
*/ */
@Test(timeout = 10_000) @Test(timeout = 10_000)
public void testAllEventsWithCommonTopic() throws JsonProcessingException, InterruptedException public void testAllEventsWithCommonTopic() throws InterruptedException, JsonProcessingException
{ {
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"", "",
@ -361,12 +363,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size()); final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents, inputEvents,
kafkaEmitterConfig.getClusterName(), kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions() kafkaEmitterConfig.getExtraDimensions()
); );
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -386,7 +388,7 @@ public class KafkaEmitterTest
* </p> * </p>
*/ */
@Test(timeout = 10_000) @Test(timeout = 10_000)
public void testUnknownEvents() throws JsonProcessingException, InterruptedException public void testUnknownEvents() throws InterruptedException, JsonProcessingException
{ {
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig( final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"", "",
@ -410,12 +412,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(SERVICE_METRIC_EVENTS.size()); final CountDownLatch eventLatch = new CountDownLatch(SERVICE_METRIC_EVENTS.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToExpectedEvents = trackExpectedEventsPerFeed(
SERVICE_METRIC_EVENTS, SERVICE_METRIC_EVENTS,
kafkaEmitterConfig.getClusterName(), kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions() kafkaEmitterConfig.getExtraDimensions()
); );
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -443,7 +445,7 @@ public class KafkaEmitterTest
); );
final ImmutableMap<String, String> extraDimensions = ImmutableMap.of("clusterId", "cluster-101"); final ImmutableMap<String, String> extraDimensions = ImmutableMap.of("clusterId", "cluster-101");
final Map<String, List<String>> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed( final Map<String, List<EventMap>> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed(
inputEvents, inputEvents,
null, null,
extraDimensions extraDimensions
@ -469,15 +471,15 @@ public class KafkaEmitterTest
// we should track the minimum buffer size per feed, compute the global maximum across all the feeds and prune the // we should track the minimum buffer size per feed, compute the global maximum across all the feeds and prune the
// expected set of events accordingly. For the sake of testing simplicity, we skip that for now. // expected set of events accordingly. For the sake of testing simplicity, we skip that for now.
int totalBufferSize = 0; int totalBufferSize = 0;
for (final List<String> feedEvents : feedToAllEventsBeforeDrop.values()) { for (final List<EventMap> feedEvents : feedToAllEventsBeforeDrop.values()) {
for (int idx = 0; idx < feedEvents.size() - bufferEventsDrop; idx++) { for (int idx = 0; idx < feedEvents.size() - bufferEventsDrop; idx++) {
totalBufferSize += feedEvents.get(idx).getBytes(StandardCharsets.UTF_8).length; totalBufferSize += MAPPER.writeValueAsString(feedEvents.get(idx)).getBytes(StandardCharsets.UTF_8).length;
} }
} }
final Map<String, List<String>> feedToExpectedEvents = new HashMap<>(); final Map<String, List<EventMap>> feedToExpectedEvents = new HashMap<>();
for (final Map.Entry<String, List<String>> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) { for (final Map.Entry<String, List<EventMap>> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) {
List<String> expectedEvents = expectedEvent.getValue(); List<EventMap> expectedEvents = expectedEvent.getValue();
feedToExpectedEvents.put(expectedEvent.getKey(), expectedEvents.subList(0, expectedEvents.size() - bufferEventsDrop)); feedToExpectedEvents.put(expectedEvent.getKey(), expectedEvents.subList(0, expectedEvents.size() - bufferEventsDrop));
} }
@ -497,7 +499,7 @@ public class KafkaEmitterTest
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig); final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size() - bufferEventsDrop); final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size() - bufferEventsDrop);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch); final Map<String, List<EventMap>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch); emitEvents(kafkaEmitter, inputEvents, eventLatch);
@ -554,19 +556,20 @@ public class KafkaEmitterTest
return flattenedList; return flattenedList;
} }
private Map<String, List<String>> trackActualEventsPerFeed( private Map<String, List<EventMap>> trackActualEventsPerFeed(
final CountDownLatch eventLatch final CountDownLatch eventLatch
) )
{ {
// A concurrent hashmap because the producer callback can trigger concurrently and can override the map initialization // A concurrent hashmap because the producer callback can trigger concurrently and can override the map initialization
final ConcurrentHashMap<String, List<String>> feedToActualEvents = new ConcurrentHashMap<>(); final ConcurrentHashMap<String, List<EventMap>> feedToActualEvents = new ConcurrentHashMap<>();
when(producer.send(any(), any())).then((invocation) -> { when(producer.send(any(), any())).then((invocation) -> {
final ProducerRecord<?, ?> producerRecord = invocation.getArgument(0); final ProducerRecord<?, ?> producerRecord = invocation.getArgument(0);
final String value = String.valueOf(producerRecord.value()); final String value = String.valueOf(producerRecord.value());
final EventMap eventMap = MAPPER.readValue(value, EventMap.class); final EventMap eventMap = MAPPER.readValue(value, EventMap.class);
feedToActualEvents.computeIfAbsent( feedToActualEvents.computeIfAbsent(
(String) eventMap.get("feed"), k -> new ArrayList<>() (String) eventMap.get("feed"), k -> new ArrayList<>()
).add(value); ).add(eventMap);
eventLatch.countDown(); eventLatch.countDown();
return null; return null;
@ -574,38 +577,37 @@ public class KafkaEmitterTest
return feedToActualEvents; return feedToActualEvents;
} }
private Map<String, List<String>> trackExpectedEventsPerFeed( private Map<String, List<EventMap>> trackExpectedEventsPerFeed(
final List<Event> events, final List<Event> events,
final String clusterName, final String clusterName,
final Map<String, String> extraDimensions final Map<String, String> extraDimensions
) throws JsonProcessingException ) throws JsonProcessingException
{ {
final Map<String, List<String>> feedToExpectedEvents = new HashMap<>(); final Map<String, List<EventMap>> feedToExpectedEvents = new HashMap<>();
for (final Event event : events) { for (final Event event : events) {
final EventMap eventMap = event.toMap(); final EventMap eventMap = MAPPER.readValue(MAPPER.writeValueAsString(event.toMap()), EventMap.class);
eventMap.computeIfAbsent("clusterName", k -> clusterName); eventMap.computeIfAbsent("clusterName", k -> clusterName);
if (extraDimensions != null) { if (extraDimensions != null) {
eventMap.putAll(extraDimensions); eventMap.putAll(extraDimensions);
} }
feedToExpectedEvents.computeIfAbsent( feedToExpectedEvents.computeIfAbsent(
event.getFeed(), k -> new ArrayList<>()).add(MAPPER.writeValueAsString(eventMap) event.getFeed(), k -> new ArrayList<>()).add(eventMap);
);
} }
return feedToExpectedEvents; return feedToExpectedEvents;
} }
private void validateEvents( private void validateEvents(
final Map<String, List<String>> feedToExpectedEvents, final Map<String, List<EventMap>> feedToExpectedEvents,
final Map<String, List<String>> feedToActualEvents final Map<String, List<EventMap>> feedToActualEvents
) )
{ {
Assert.assertEquals(feedToExpectedEvents.size(), feedToActualEvents.size()); Assert.assertEquals(feedToExpectedEvents.size(), feedToActualEvents.size());
for (final Map.Entry<String, List<String>> actualEntry : feedToActualEvents.entrySet()) { for (final Map.Entry<String, List<EventMap>> actualEntry : feedToActualEvents.entrySet()) {
final String feed = actualEntry.getKey(); final String feed = actualEntry.getKey();
final List<String> actualEvents = actualEntry.getValue(); final List<EventMap> actualEvents = actualEntry.getValue();
final List<String> expectedEvents = feedToExpectedEvents.get(feed); final List<EventMap> expectedEvents = feedToExpectedEvents.get(feed);
Assert.assertEquals(expectedEvents, actualEvents); assertThat(actualEvents, containsInAnyOrder(expectedEvents.toArray(new Map[0])));
} }
} }