From f224035c7e2121ae726bf9b04dd69545778d2eaa Mon Sep 17 00:00:00 2001
From: Tom
Date: Wed, 14 Feb 2024 20:31:55 -0800
Subject: [PATCH] Fix Flakiness in KafkaEmitterTest (#15907)
* thrust of the fix to allow for the json values to be out of order
The existing problem is that toMap doesn't turn some values into json primitive
values, for example segmentMetadata just has DateTime objects for it's time in
the EventMap, but Alert event converts those into strings when calling toMap.
This creates an issue because when we check the emitted events the mapper
deserializing the string value for dateTime leaves it as a string in the
EventMap. So the question is do we alter the events toMap() to return string/map
version of objects or to make the expected events do a round trip of
eventMap -> string -> eventMap to turn everything into json primitives
* fix issue by making toMap events convert Objects into strings, or maps
* fix linting errors
* use method of using mapper to round trip expected data to make it have same type
as those of the events emitted
* remove unnecessary comment
---
.../druid/emitter/kafka/KafkaEmitterTest.java | 80 ++++++++++---------
1 file changed, 41 insertions(+), 39 deletions(-)
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index e7bd7ac47d4..a9f5e14fbea 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -54,6 +54,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -135,7 +137,7 @@ public class KafkaEmitterTest
* events are emitted without any drops.
*/
@Test(timeout = 10_000)
- public void testServiceMetricEvents() throws JsonProcessingException, InterruptedException
+ public void testServiceMetricEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -155,12 +157,12 @@ public class KafkaEmitterTest
final List inputEvents = flattenEvents(SERVICE_METRIC_EVENTS);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -180,7 +182,7 @@ public class KafkaEmitterTest
* events are emitted without any drops.
*/
@Test(timeout = 10_000)
- public void testAllEvents() throws JsonProcessingException, InterruptedException
+ public void testAllEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -205,12 +207,12 @@ public class KafkaEmitterTest
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -230,7 +232,7 @@ public class KafkaEmitterTest
* event types should be emitted without any drops.
*/
@Test(timeout = 10_000)
- public void testDefaultEvents() throws JsonProcessingException, InterruptedException
+ public void testDefaultEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -253,12 +255,12 @@ public class KafkaEmitterTest
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -278,7 +280,7 @@ public class KafkaEmitterTest
* should be emitted, and everything else should be dropped.
*/
@Test(timeout = 10_000)
- public void testAlertsPlusUnsubscribedEvents() throws JsonProcessingException, InterruptedException
+ public void testAlertsPlusUnsubscribedEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -305,12 +307,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(ALERT_EVENTS.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
ALERT_EVENTS,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -335,7 +337,7 @@ public class KafkaEmitterTest
*
*/
@Test(timeout = 10_000)
- public void testAllEventsWithCommonTopic() throws JsonProcessingException, InterruptedException
+ public void testAllEventsWithCommonTopic() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -361,12 +363,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -386,7 +388,7 @@ public class KafkaEmitterTest
*
*/
@Test(timeout = 10_000)
- public void testUnknownEvents() throws JsonProcessingException, InterruptedException
+ public void testUnknownEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -410,12 +412,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(SERVICE_METRIC_EVENTS.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
SERVICE_METRIC_EVENTS,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -443,7 +445,7 @@ public class KafkaEmitterTest
);
final ImmutableMap extraDimensions = ImmutableMap.of("clusterId", "cluster-101");
- final Map> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed(
+ final Map> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed(
inputEvents,
null,
extraDimensions
@@ -469,15 +471,15 @@ public class KafkaEmitterTest
// we should track the minimum buffer size per feed, compute the global maximum across all the feeds and prune the
// expected set of events accordingly. For the sake of testing simplicity, we skip that for now.
int totalBufferSize = 0;
- for (final List feedEvents : feedToAllEventsBeforeDrop.values()) {
+ for (final List feedEvents : feedToAllEventsBeforeDrop.values()) {
for (int idx = 0; idx < feedEvents.size() - bufferEventsDrop; idx++) {
- totalBufferSize += feedEvents.get(idx).getBytes(StandardCharsets.UTF_8).length;
+ totalBufferSize += MAPPER.writeValueAsString(feedEvents.get(idx)).getBytes(StandardCharsets.UTF_8).length;
}
}
- final Map> feedToExpectedEvents = new HashMap<>();
- for (final Map.Entry> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) {
- List expectedEvents = expectedEvent.getValue();
+ final Map> feedToExpectedEvents = new HashMap<>();
+ for (final Map.Entry> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) {
+ List expectedEvents = expectedEvent.getValue();
feedToExpectedEvents.put(expectedEvent.getKey(), expectedEvents.subList(0, expectedEvents.size() - bufferEventsDrop));
}
@@ -497,7 +499,7 @@ public class KafkaEmitterTest
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size() - bufferEventsDrop);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -554,19 +556,20 @@ public class KafkaEmitterTest
return flattenedList;
}
- private Map> trackActualEventsPerFeed(
+ private Map> trackActualEventsPerFeed(
final CountDownLatch eventLatch
)
{
+
// A concurrent hashmap because the producer callback can trigger concurrently and can override the map initialization
- final ConcurrentHashMap> feedToActualEvents = new ConcurrentHashMap<>();
+ final ConcurrentHashMap> feedToActualEvents = new ConcurrentHashMap<>();
when(producer.send(any(), any())).then((invocation) -> {
final ProducerRecord, ?> producerRecord = invocation.getArgument(0);
final String value = String.valueOf(producerRecord.value());
final EventMap eventMap = MAPPER.readValue(value, EventMap.class);
feedToActualEvents.computeIfAbsent(
(String) eventMap.get("feed"), k -> new ArrayList<>()
- ).add(value);
+ ).add(eventMap);
eventLatch.countDown();
return null;
@@ -574,38 +577,37 @@ public class KafkaEmitterTest
return feedToActualEvents;
}
- private Map> trackExpectedEventsPerFeed(
+ private Map> trackExpectedEventsPerFeed(
final List events,
final String clusterName,
final Map extraDimensions
) throws JsonProcessingException
{
- final Map> feedToExpectedEvents = new HashMap<>();
+ final Map> feedToExpectedEvents = new HashMap<>();
for (final Event event : events) {
- final EventMap eventMap = event.toMap();
+ final EventMap eventMap = MAPPER.readValue(MAPPER.writeValueAsString(event.toMap()), EventMap.class);
eventMap.computeIfAbsent("clusterName", k -> clusterName);
if (extraDimensions != null) {
eventMap.putAll(extraDimensions);
}
feedToExpectedEvents.computeIfAbsent(
- event.getFeed(), k -> new ArrayList<>()).add(MAPPER.writeValueAsString(eventMap)
- );
+ event.getFeed(), k -> new ArrayList<>()).add(eventMap);
}
return feedToExpectedEvents;
}
private void validateEvents(
- final Map> feedToExpectedEvents,
- final Map> feedToActualEvents
+ final Map> feedToExpectedEvents,
+ final Map> feedToActualEvents
)
{
Assert.assertEquals(feedToExpectedEvents.size(), feedToActualEvents.size());
- for (final Map.Entry> actualEntry : feedToActualEvents.entrySet()) {
+ for (final Map.Entry> actualEntry : feedToActualEvents.entrySet()) {
final String feed = actualEntry.getKey();
- final List actualEvents = actualEntry.getValue();
- final List expectedEvents = feedToExpectedEvents.get(feed);
- Assert.assertEquals(expectedEvents, actualEvents);
+ final List actualEvents = actualEntry.getValue();
+ final List expectedEvents = feedToExpectedEvents.get(feed);
+ assertThat(actualEvents, containsInAnyOrder(expectedEvents.toArray(new Map[0])));
}
}