diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index e7bd7ac47d4..a9f5e14fbea 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -54,6 +54,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -135,7 +137,7 @@ public class KafkaEmitterTest
* events are emitted without any drops.
*/
@Test(timeout = 10_000)
- public void testServiceMetricEvents() throws JsonProcessingException, InterruptedException
+ public void testServiceMetricEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -155,12 +157,12 @@ public class KafkaEmitterTest
final List inputEvents = flattenEvents(SERVICE_METRIC_EVENTS);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -180,7 +182,7 @@ public class KafkaEmitterTest
* events are emitted without any drops.
*/
@Test(timeout = 10_000)
- public void testAllEvents() throws JsonProcessingException, InterruptedException
+ public void testAllEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -205,12 +207,12 @@ public class KafkaEmitterTest
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -230,7 +232,7 @@ public class KafkaEmitterTest
* event types should be emitted without any drops.
*/
@Test(timeout = 10_000)
- public void testDefaultEvents() throws JsonProcessingException, InterruptedException
+ public void testDefaultEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -253,12 +255,12 @@ public class KafkaEmitterTest
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -278,7 +280,7 @@ public class KafkaEmitterTest
* should be emitted, and everything else should be dropped.
*/
@Test(timeout = 10_000)
- public void testAlertsPlusUnsubscribedEvents() throws JsonProcessingException, InterruptedException
+ public void testAlertsPlusUnsubscribedEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -305,12 +307,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(ALERT_EVENTS.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
ALERT_EVENTS,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -335,7 +337,7 @@ public class KafkaEmitterTest
*
*/
@Test(timeout = 10_000)
- public void testAllEventsWithCommonTopic() throws JsonProcessingException, InterruptedException
+ public void testAllEventsWithCommonTopic() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -361,12 +363,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -386,7 +388,7 @@ public class KafkaEmitterTest
*
*/
@Test(timeout = 10_000)
- public void testUnknownEvents() throws JsonProcessingException, InterruptedException
+ public void testUnknownEvents() throws InterruptedException, JsonProcessingException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
@@ -410,12 +412,12 @@ public class KafkaEmitterTest
final CountDownLatch eventLatch = new CountDownLatch(SERVICE_METRIC_EVENTS.size());
- final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
+ final Map> feedToExpectedEvents = trackExpectedEventsPerFeed(
SERVICE_METRIC_EVENTS,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -443,7 +445,7 @@ public class KafkaEmitterTest
);
final ImmutableMap extraDimensions = ImmutableMap.of("clusterId", "cluster-101");
- final Map> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed(
+ final Map> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed(
inputEvents,
null,
extraDimensions
@@ -469,15 +471,15 @@ public class KafkaEmitterTest
// we should track the minimum buffer size per feed, compute the global maximum across all the feeds and prune the
// expected set of events accordingly. For the sake of testing simplicity, we skip that for now.
int totalBufferSize = 0;
- for (final List feedEvents : feedToAllEventsBeforeDrop.values()) {
+ for (final List feedEvents : feedToAllEventsBeforeDrop.values()) {
for (int idx = 0; idx < feedEvents.size() - bufferEventsDrop; idx++) {
- totalBufferSize += feedEvents.get(idx).getBytes(StandardCharsets.UTF_8).length;
+ totalBufferSize += MAPPER.writeValueAsString(feedEvents.get(idx)).getBytes(StandardCharsets.UTF_8).length;
}
}
- final Map> feedToExpectedEvents = new HashMap<>();
- for (final Map.Entry> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) {
- List expectedEvents = expectedEvent.getValue();
+ final Map> feedToExpectedEvents = new HashMap<>();
+ for (final Map.Entry> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) {
+ List expectedEvents = expectedEvent.getValue();
feedToExpectedEvents.put(expectedEvent.getKey(), expectedEvents.subList(0, expectedEvents.size() - bufferEventsDrop));
}
@@ -497,7 +499,7 @@ public class KafkaEmitterTest
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size() - bufferEventsDrop);
- final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
+ final Map> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
@@ -554,19 +556,20 @@ public class KafkaEmitterTest
return flattenedList;
}
- private Map> trackActualEventsPerFeed(
+ private Map> trackActualEventsPerFeed(
final CountDownLatch eventLatch
)
{
+
// A concurrent hashmap because the producer callback can trigger concurrently and can override the map initialization
- final ConcurrentHashMap> feedToActualEvents = new ConcurrentHashMap<>();
+ final ConcurrentHashMap> feedToActualEvents = new ConcurrentHashMap<>();
when(producer.send(any(), any())).then((invocation) -> {
final ProducerRecord, ?> producerRecord = invocation.getArgument(0);
final String value = String.valueOf(producerRecord.value());
final EventMap eventMap = MAPPER.readValue(value, EventMap.class);
feedToActualEvents.computeIfAbsent(
(String) eventMap.get("feed"), k -> new ArrayList<>()
- ).add(value);
+ ).add(eventMap);
eventLatch.countDown();
return null;
@@ -574,38 +577,37 @@ public class KafkaEmitterTest
return feedToActualEvents;
}
- private Map> trackExpectedEventsPerFeed(
+ private Map> trackExpectedEventsPerFeed(
final List events,
final String clusterName,
final Map extraDimensions
) throws JsonProcessingException
{
- final Map> feedToExpectedEvents = new HashMap<>();
+ final Map> feedToExpectedEvents = new HashMap<>();
for (final Event event : events) {
- final EventMap eventMap = event.toMap();
+ final EventMap eventMap = MAPPER.readValue(MAPPER.writeValueAsString(event.toMap()), EventMap.class);
eventMap.computeIfAbsent("clusterName", k -> clusterName);
if (extraDimensions != null) {
eventMap.putAll(extraDimensions);
}
feedToExpectedEvents.computeIfAbsent(
- event.getFeed(), k -> new ArrayList<>()).add(MAPPER.writeValueAsString(eventMap)
- );
+ event.getFeed(), k -> new ArrayList<>()).add(eventMap);
}
return feedToExpectedEvents;
}
private void validateEvents(
- final Map> feedToExpectedEvents,
- final Map> feedToActualEvents
+ final Map> feedToExpectedEvents,
+ final Map> feedToActualEvents
)
{
Assert.assertEquals(feedToExpectedEvents.size(), feedToActualEvents.size());
- for (final Map.Entry> actualEntry : feedToActualEvents.entrySet()) {
+ for (final Map.Entry> actualEntry : feedToActualEvents.entrySet()) {
final String feed = actualEntry.getKey();
- final List actualEvents = actualEntry.getValue();
- final List expectedEvents = feedToExpectedEvents.get(feed);
- Assert.assertEquals(expectedEvents, actualEvents);
+ final List actualEvents = actualEntry.getValue();
+ final List expectedEvents = feedToExpectedEvents.get(feed);
+ assertThat(actualEvents, containsInAnyOrder(expectedEvents.toArray(new Map[0])));
}
}