mirror of https://github.com/apache/druid.git
Add javadocs to `KafkaEmitterTest` & fix flaky test (#15898)
* Address review comment: add test javadocs * Fix flaky assertion failure. Use ConcurrentHashMap instead of HashMap because the producer callback can trigger concurrently and override the map initialization. * fixup intellij inspection
This commit is contained in:
parent
be0ee2ee33
commit
c324e37751
|
@ -51,6 +51,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -128,6 +129,11 @@ public class KafkaEmitterTest
|
|||
new TestEvent()
|
||||
);
|
||||
|
||||
/**
|
||||
* Unit test to validate the handling of {@link ServiceMetricEvent}s.
|
||||
* Only {@link KafkaEmitterConfig.EventType}s is subscribed in the config, so the expectation is that the
|
||||
* events are emitted without any drops.
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testServiceMetricEvents() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -167,6 +173,12 @@ public class KafkaEmitterTest
|
|||
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test to validate the handling of all event types, including {@link ServiceMetricEvent},
|
||||
* {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, and {@link SegmentMetadataEvent}.
|
||||
* All {@link KafkaEmitterConfig.EventType}s are subscribed in the config, so the expectation is that all the
|
||||
* events are emitted without any drops.
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testAllEvents() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -212,6 +224,11 @@ public class KafkaEmitterTest
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test to validate the handling of the default event types - {@link ServiceMetricEvent} and {@link AlertEvent}.
|
||||
* The default event types (alerts and metrics) are subscribed in the config, so the expectation is that both input
|
||||
* event types should be emitted without any drops.
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testDefaultEvents() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -254,6 +271,12 @@ public class KafkaEmitterTest
|
|||
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test to validate the handling of all valid event types, including {@link ServiceMetricEvent},
|
||||
* {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, and {@link SegmentMetadataEvent}.
|
||||
* Only alerts are subscribed in the config, so the expectation is that only alert events
|
||||
* should be emitted, and everything else should be dropped.
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testAlertsPlusUnsubscribedEvents() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -302,6 +325,15 @@ public class KafkaEmitterTest
|
|||
Assert.assertEquals(REQUEST_LOG_EVENTS.size(), kafkaEmitter.getRequestLostCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to {@link #testAllEvents()}, this test configures all event feeds to emit to the same topic.
|
||||
* <p>
|
||||
* Unit test to validate the handling of all valid event types, including {@link ServiceMetricEvent},
|
||||
* {@link AlertEvent}, {@link org.apache.druid.server.log.RequestLogEvent}, and {@link SegmentMetadataEvent}.
|
||||
* All {@link KafkaEmitterConfig.EventType}s are subscribed to the same topic in the config, so the expectation
|
||||
* is that all input events are emitted without any drops.
|
||||
* </p>
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testAllEventsWithCommonTopic() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -347,6 +379,12 @@ public class KafkaEmitterTest
|
|||
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test to validate the handling of {@link ServiceMetricEvent}s and {@link TestEvent}s.
|
||||
* The default event types (alerts and metrics) are subscribed in the config, so the expectation is that only
|
||||
* {@link ServiceMetricEvent} is expected to be emitted, while dropping all unknown {@link TestEvent}s.
|
||||
* </p>
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testUnknownEvents() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -390,6 +428,13 @@ public class KafkaEmitterTest
|
|||
Assert.assertEquals(UNKNOWN_EVENTS.size(), kafkaEmitter.getInvalidLostCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test to validate the handling of {@link ServiceMetricEvent}s when the Kafka emitter queue, which buffers up events
|
||||
* becomes full. The queue size in the config is set via {@code buffer.memory} and is computed from
|
||||
* the input events using {@code bufferEventsDrop}. The default event types (alerts and metrics) are subscribed in
|
||||
* the config, so the expectation is that all {@link ServiceMetricEvent}s up to {@code n - bufferEventsDrop} will be
|
||||
* emitted, {@code n} being the total number of input events, while dropping the last {@code bufferEventsDrop} events.
|
||||
*/
|
||||
@Test(timeout = 10_000)
|
||||
public void testDropEventsWhenQueueFull() throws JsonProcessingException, InterruptedException
|
||||
{
|
||||
|
@ -513,7 +558,8 @@ public class KafkaEmitterTest
|
|||
final CountDownLatch eventLatch
|
||||
)
|
||||
{
|
||||
final Map<String, List<String>> feedToActualEvents = new HashMap<>();
|
||||
// A concurrent hashmap because the producer callback can trigger concurrently and can override the map initialization
|
||||
final ConcurrentHashMap<String, List<String>> feedToActualEvents = new ConcurrentHashMap<>();
|
||||
when(producer.send(any(), any())).then((invocation) -> {
|
||||
final ProducerRecord<?, ?> producerRecord = invocation.getArgument(0);
|
||||
final String value = String.valueOf(producerRecord.value());
|
||||
|
|
Loading…
Reference in New Issue