From c324e377512bfa815e0e71fc3dbdb69934d9951e Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Wed, 14 Feb 2024 11:52:06 -0800 Subject: [PATCH] Add javadocs to `KafkaEmitterTest` & fix flaky test (#15898) * Address review comment: add test javadocs * Fix flaky assertion failure. Use ConcurrentHashMap instead of HashMap because the producer callback can trigger concurrently and override the map initialization. * fixup intellij inspection --- .../druid/emitter/kafka/KafkaEmitterTest.java | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java index 74a412a203e..e7bd7ac47d4 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -51,6 +51,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import static org.mockito.ArgumentMatchers.any; @@ -128,6 +129,11 @@ public class KafkaEmitterTest new TestEvent() ); + /** + * Unit test to validate the handling of {@link ServiceMetricEvent}s. + * Only {@link KafkaEmitterConfig.EventType}s is subscribed in the config, so the expectation is that the + * events are emitted without any drops. + */ @Test(timeout = 10_000) public void testServiceMetricEvents() throws JsonProcessingException, InterruptedException { @@ -167,6 +173,12 @@ public class KafkaEmitterTest Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount()); } + /** + * Unit test to validate the handling of all event types, including {@link ServiceMetricEvent}, + * {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, and {@link SegmentMetadataEvent}. + * All {@link KafkaEmitterConfig.EventType}s are subscribed in the config, so the expectation is that all the + * events are emitted without any drops. + */ @Test(timeout = 10_000) public void testAllEvents() throws JsonProcessingException, InterruptedException { @@ -212,6 +224,11 @@ public class KafkaEmitterTest } + /** + * Unit test to validate the handling of the default event types - {@link ServiceMetricEvent} and {@link AlertEvent}. + * The default event types (alerts and metrics) are subscribed in the config, so the expectation is that both input + * event types should be emitted without any drops. + */ @Test(timeout = 10_000) public void testDefaultEvents() throws JsonProcessingException, InterruptedException { @@ -254,6 +271,12 @@ public class KafkaEmitterTest Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount()); } + /** + * Unit test to validate the handling of all valid event types, including {@link ServiceMetricEvent}, + * {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, and {@link SegmentMetadataEvent}. + * Only alerts are subscribed in the config, so the expectation is that only alert events + * should be emitted, and everything else should be dropped. + */ @Test(timeout = 10_000) public void testAlertsPlusUnsubscribedEvents() throws JsonProcessingException, InterruptedException { @@ -302,6 +325,15 @@ public class KafkaEmitterTest Assert.assertEquals(REQUEST_LOG_EVENTS.size(), kafkaEmitter.getRequestLostCount()); } + /** + * Similar to {@link #testAllEvents()}, this test configures all event feeds to emit to the same topic. + *

+ * Unit test to validate the handling of all valid event types, including {@link ServiceMetricEvent}, + * {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, and {@link SegmentMetadataEvent}. + * All {@link KafkaEmitterConfig.EventType}s are subscribed to the same topic in the config, so the expectation + * is that all input events are emitted without any drops. + *

+ */ @Test(timeout = 10_000) public void testAllEventsWithCommonTopic() throws JsonProcessingException, InterruptedException { @@ -347,6 +379,12 @@ public class KafkaEmitterTest Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount()); } + /** + * Unit test to validate the handling of {@link ServiceMetricEvent}s and {@link TestEvent}s. + * The default event types (alerts and metrics) are subscribed in the config, so the expectation is that only + * {@link ServiceMetricEvent} is expected to be emitted, while dropping all unknown {@link TestEvent}s. + *

+ */ @Test(timeout = 10_000) public void testUnknownEvents() throws JsonProcessingException, InterruptedException { @@ -390,6 +428,13 @@ public class KafkaEmitterTest Assert.assertEquals(UNKNOWN_EVENTS.size(), kafkaEmitter.getInvalidLostCount()); } + /** + * Unit test to validate the handling of {@link ServiceMetricEvent}s when the Kafka emitter queue, which buffers up events + * becomes full. The queue size in the config is set via {@code buffer.memory} and is computed from + * the input events using {@code bufferEventsDrop}. The default event types (alerts and metrics) are subscribed in + * the config, so the expectation is that all {@link ServiceMetricEvent}s up to {@code n - bufferEventsDrop} will be + * emitted, {@code n} being the total number of input events, while dropping the last {@code bufferEventsDrop} events. + */ @Test(timeout = 10_000) public void testDropEventsWhenQueueFull() throws JsonProcessingException, InterruptedException { @@ -513,7 +558,8 @@ public class KafkaEmitterTest final CountDownLatch eventLatch ) { - final Map> feedToActualEvents = new HashMap<>(); + // A concurrent hashmap because the producer callback can trigger concurrently and can override the map initialization + final ConcurrentHashMap> feedToActualEvents = new ConcurrentHashMap<>(); when(producer.send(any(), any())).then((invocation) -> { final ProducerRecord producerRecord = invocation.getArgument(0); final String value = String.valueOf(producerRecord.value());