Clean up kafka emitter tests, add more validations and code coverage. (#15878)

* Clean up kafka emitter tests a bit and add more validations.

The test wasn't validating what events were sent, but simply the dropped counters, which
aren't that useful.
Additionally, this module has fewer tests, so folks often run into code coverage issue
in this extension. Hopefully this change helps with that too.

* Change things to feed-based rather than topic-based.

* Another test for shared topic

* Switch to DruidException, add test dependencies and sad path config tests.

* missing test dependency

* minor renames.

* Add more tests - to test unknown events and drop when queue is full
This commit is contained in:
Abhishek Radhakrishnan 2024-02-12 13:22:19 -08:00 committed by GitHub
parent 7fea34abdd
commit 51fd79ee58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 706 additions and 86 deletions

View File

@ -116,6 +116,16 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -22,9 +22,9 @@ package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
@ -95,12 +95,45 @@ public class KafkaEmitterConfig
@JsonProperty("producer.hiddenProperties") @Nullable DynamicConfigProvider<String> kafkaProducerSecrets
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null");
this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? Preconditions.checkNotNull(metricTopic, "druid.emitter.kafka.metric.topic can not be null") : null;
this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? Preconditions.checkNotNull(alertTopic, "druid.emitter.kafka.alert.topic can not be null") : null;
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null;
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
// Validate all required properties
if (bootstrapServers == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("druid.emitter.kafka.bootstrap.servers must be specified.");
}
if (this.eventTypes.contains(EventType.METRICS) && metricTopic == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("druid.emitter.kafka.metric.topic must be specified"
+ " if druid.emitter.kafka.event.types contains %s.", EventType.METRICS);
}
if (this.eventTypes.contains(EventType.ALERTS) && alertTopic == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("druid.emitter.kafka.alert.topic must be specified"
+ " if druid.emitter.kafka.event.types contains %s.", EventType.ALERTS);
}
if (this.eventTypes.contains(EventType.REQUESTS) && requestTopic == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("druid.emitter.kafka.request.topic must be specified"
+ " if druid.emitter.kafka.event.types contains %s.", EventType.REQUESTS);
}
if (this.eventTypes.contains(EventType.SEGMENT_METADATA) && segmentMetadataTopic == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("druid.emitter.kafka.segmentMetadata.topic must be specified"
+ " if druid.emitter.kafka.event.types contains %s.", EventType.SEGMENT_METADATA);
}
this.bootstrapServers = bootstrapServers;
this.metricTopic = metricTopic;
this.alertTopic = alertTopic;
this.requestTopic = requestTopic;
this.segmentMetadataTopic = segmentMetadataTopic;
this.clusterName = clusterName;
this.extraDimensions = extraDimensions;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;

View File

@ -20,18 +20,21 @@
package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@ -41,12 +44,6 @@ public class KafkaEmitterConfigTest
ImmutableMap.of("testSecretKey", "testSecretValue"));
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@Before
public void setUp()
{
MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
}
@Test
public void testSerDeserKafkaEmitterConfig() throws IOException
{
@ -116,12 +113,24 @@ public class KafkaEmitterConfigTest
}
@Test
public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer()
public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer() throws JsonProcessingException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", null, "metricTest",
"alertTest", null, "metadataTest",
"clusterNameTest", null, null, null
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"localhost:9092",
null,
"metricTest",
"alertTest",
null,
"metadataTest",
null,
ImmutableMap.of("env", "preProd"),
null,
null
);
String kafkaEmitterConfigString = MAPPER.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = MAPPER.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
try {
@SuppressWarnings("unused")
KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, MAPPER);
@ -153,4 +162,139 @@ public class KafkaEmitterConfigTest
{
Assert.assertTrue(new KafkaEmitterModule().getJacksonModules().isEmpty());
}
@Test
public void testNullBootstrapServers()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new KafkaEmitterConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
),
operatorExceptionMatcher()
.expectMessageIs("druid.emitter.kafka.bootstrap.servers must be specified.")
);
}
@Test
public void testNullMetricTopic()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new KafkaEmitterConfig(
"foo",
new HashSet<>(Collections.singletonList(KafkaEmitterConfig.EventType.METRICS)),
null,
null,
null,
null,
null,
null,
null,
null
)
),
operatorExceptionMatcher()
.expectMessageIs("druid.emitter.kafka.metric.topic must be specified if druid.emitter.kafka.event.types contains metrics.")
);
}
@Test
public void testNullAlertTopic()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new KafkaEmitterConfig(
"foo",
null,
"foo",
null,
null,
null,
null,
null,
null,
null
)
),
operatorExceptionMatcher()
.expectMessageIs(
"druid.emitter.kafka.alert.topic must be specified if druid.emitter.kafka.event.types contains alerts."
)
);
}
@Test
public void testNullRequestTopic()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new KafkaEmitterConfig(
"foo",
new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.values())),
"foo",
"foo",
null,
null,
null,
null,
null,
null
)
),
operatorExceptionMatcher()
.expectMessageIs(
"druid.emitter.kafka.request.topic must be specified if druid.emitter.kafka.event.types contains requests."
)
);
}
@Test
public void testNullSegmentMetadataTopic()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new KafkaEmitterConfig(
"foo",
new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.values())),
"foo",
"bar",
"baz",
null,
null,
null,
null,
null
)
),
operatorExceptionMatcher()
.expectMessageIs(
"druid.emitter.kafka.segmentMetadata.topic must be specified if druid.emitter.kafka.event.types contains segment_metadata."
)
);
}
private DruidExceptionMatcher operatorExceptionMatcher()
{
return new DruidExceptionMatcher(
DruidException.Persona.OPERATOR,
DruidException.Category.NOT_FOUND,
"general"
);
}
}

View File

@ -19,95 +19,459 @@
package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
@Parameterized.Parameter(0)
public Set<KafkaEmitterConfig.EventType> eventsType;
private KafkaProducer<String, String> producer;
@Parameterized.Parameter(1)
public String requestTopic;
@Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic - {1}")
public static Object[] data()
@Before
public void setup()
{
return new Object[][] {
{new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), "requests"},
{new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), null}
};
producer = mock(KafkaProducer.class);
}
// there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 10 seconds
@Test(timeout = 10_000)
public void testKafkaEmitter() throws InterruptedException
@After
public void tearDown()
{
final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
ServiceMetricEvent.builder().setMetric("m1", 1).build("service", "host")
producer.close();
}
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
private static final List<Event> SERVICE_METRIC_EVENTS = ImmutableList.of(
ServiceMetricEvent.builder().setMetric("m1", 1).build("service1", "host1"),
ServiceMetricEvent.builder().setMetric("m2", 100).build("service2", "host1"),
ServiceMetricEvent.builder().setMetric("m3", 200).build("service2", "host1"),
ServiceMetricEvent.builder().setMetric("m1", 150).build("service2", "host2"),
ServiceMetricEvent.builder().setMetric("m5", 250).build("service3", "host2")
);
private static final List<Event> ALERT_EVENTS = ImmutableList.of(
new AlertEvent("service1", "host1", "description"),
new AlertEvent("service2", "host2", "description")
);
private static final List<Event> REQUEST_LOG_EVENTS = ImmutableList.of(
DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder(
"requests",
RequestLogLine.forSql(
"sql1", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of())
)
).build("service", "host"),
DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder(
"requests",
RequestLogLine.forSql(
"sql2", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of())
)
).build("service", "host")
);
private static final List<Event> SEGMENT_METADATA_EVENTS = ImmutableList.of(
new SegmentMetadataEvent(
"ds1",
DateTimes.of("2001-01-01T00:00:00.000Z"),
DateTimes.of("2001-01-02T00:00:00.000Z"),
DateTimes.of("2001-01-03T00:00:00.000Z"),
"ds1",
true
),
new SegmentMetadataEvent(
"ds2",
DateTimes.of("2020-01-01T00:00:00.000Z"),
DateTimes.of("2020-01-02T00:00:00.000Z"),
DateTimes.of("2020-01-03T00:00:00.000Z"),
"ds2",
true
)
);
private static final List<Event> UNKNOWN_EVENTS = ImmutableList.of(
new TestEvent(),
new TestEvent(),
new TestEvent()
);
@Test(timeout = 10_000)
public void testServiceMetricEvents() throws JsonProcessingException, InterruptedException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
new HashSet<>(Collections.singletonList(KafkaEmitterConfig.EventType.METRICS)),
"metrics",
"alerts",
"requests",
"segments",
"clusterName",
ImmutableMap.of("clusterId", "cluster-101"),
null,
null
);
final List<AlertEvent> alertEvents = ImmutableList.of(
new AlertEvent("service", "host", "description")
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final List<Event> inputEvents = flattenEvents(SERVICE_METRIC_EVENTS);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
@Test(timeout = 10_000)
public void testAllEvents() throws JsonProcessingException, InterruptedException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.values())),
"metrics",
"alerts",
"requests",
"segments",
null,
ImmutableMap.of("clusterId", "cluster-101", "env", "staging"),
null,
null
);
final List<RequestLogEvent> requestLogEvents = ImmutableList.of(
DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests",
RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()))
).build("service", "host")
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final List<Event> inputEvents = flattenEvents(
SERVICE_METRIC_EVENTS,
ALERT_EVENTS,
REQUEST_LOG_EVENTS,
SEGMENT_METADATA_EVENTS
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
@Test(timeout = 10_000)
public void testDefaultEvents() throws JsonProcessingException, InterruptedException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
KafkaEmitterConfig.DEFAULT_EVENT_TYPES,
"metrics",
"alerts",
"requests",
"segment_metadata",
"clusterName",
null,
null,
null
);
final List<SegmentMetadataEvent> segmentMetadataEvents = ImmutableList.of(
new SegmentMetadataEvent(
"dummy_datasource",
DateTimes.of("2001-01-01T00:00:00.000Z"),
DateTimes.of("2001-01-02T00:00:00.000Z"),
DateTimes.of("2001-01-03T00:00:00.000Z"),
"dummy_version",
true
)
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final List<Event> inputEvents = flattenEvents(
SERVICE_METRIC_EVENTS,
ALERT_EVENTS
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
@Test(timeout = 10_000)
public void testAlertsPlusUnsubscribedEvents() throws JsonProcessingException, InterruptedException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
new HashSet<>(Collections.singletonList(KafkaEmitterConfig.EventType.ALERTS)),
"metrics",
"alerts",
"requests",
"segment_metadata",
"clusterName",
null,
null,
null
);
int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size() + segmentMetadataEvents.size();
int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final CountDownLatch countDownSentEvents = new CountDownLatch(
requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
// Emit everything. Since we only subscribe to alert feeds, everything else should be dropped.
final List<Event> inputEvents = flattenEvents(
SERVICE_METRIC_EVENTS,
ALERT_EVENTS,
SEGMENT_METADATA_EVENTS,
REQUEST_LOG_EVENTS
);
final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JodaModule());
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
new KafkaEmitterConfig("", eventsType, "metrics", "alerts", requestTopic, "metadata", "test-cluster", ImmutableMap.of("clusterId", "cluster-101"), null, null),
mapper
final CountDownLatch eventLatch = new CountDownLatch(ALERT_EVENTS.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed(
ALERT_EVENTS,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
// Others would be dropped as we've only subscribed to alert events.
Assert.assertEquals(SERVICE_METRIC_EVENTS.size(), kafkaEmitter.getMetricLostCount());
Assert.assertEquals(SEGMENT_METADATA_EVENTS.size(), kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(REQUEST_LOG_EVENTS.size(), kafkaEmitter.getRequestLostCount());
}
@Test(timeout = 10_000)
public void testAllEventsWithCommonTopic() throws JsonProcessingException, InterruptedException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.values())),
"topic",
"topic",
"topic",
"topic",
null,
null,
null,
null
);
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final List<Event> inputEvents = flattenEvents(
SERVICE_METRIC_EVENTS,
ALERT_EVENTS,
SEGMENT_METADATA_EVENTS,
REQUEST_LOG_EVENTS
);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed(
inputEvents,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
@Test(timeout = 10_000)
public void testUnknownEvents() throws JsonProcessingException, InterruptedException
{
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
KafkaEmitterConfig.DEFAULT_EVENT_TYPES,
"topic",
"topic",
null,
null,
"cluster-102",
ImmutableMap.of("clusterName", "cluster-101", "env", "staging"), // clusterName again, extraDimensions should take precedence
null,
null
);
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final List<Event> inputEvents = flattenEvents(
UNKNOWN_EVENTS,
SERVICE_METRIC_EVENTS
);
final CountDownLatch eventLatch = new CountDownLatch(SERVICE_METRIC_EVENTS.size());
final Map<String, List<String>> feedToExpectedEvents = trackExpectedEventsPerFeed(
SERVICE_METRIC_EVENTS,
kafkaEmitterConfig.getClusterName(),
kafkaEmitterConfig.getExtraDimensions()
);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(UNKNOWN_EVENTS.size(), kafkaEmitter.getInvalidLostCount());
}
@Test(timeout = 10_000)
public void testDropEventsWhenQueueFull() throws JsonProcessingException, InterruptedException
{
final List<Event> inputEvents = flattenEvents(
SERVICE_METRIC_EVENTS
);
final ImmutableMap<String, String> extraDimensions = ImmutableMap.of("clusterId", "cluster-101");
final Map<String, List<String>> feedToAllEventsBeforeDrop = trackExpectedEventsPerFeed(
inputEvents,
null,
extraDimensions
);
final int bufferEventsDrop = 3;
Assert.assertTrue(
StringUtils.format(
"Total events to emit: %d. There must at least be %d events to drop.",
inputEvents.size(),
bufferEventsDrop
),
inputEvents.size() - bufferEventsDrop > 0
);
Assert.assertEquals(
"Currently the test only supports having 1 feed",
1,
feedToAllEventsBeforeDrop.size()
);
// Note: this only accounts for one feed currently. If we want to test the queuing behavior across all feeds,
// we should track the minimum buffer size per feed, compute the global maximum across all the feeds and prune the
// expected set of events accordingly. For the sake of testing simplicity, we skip that for now.
int totalBufferSize = 0;
for (final List<String> feedEvents : feedToAllEventsBeforeDrop.values()) {
for (int idx = 0; idx < feedEvents.size() - bufferEventsDrop; idx++) {
totalBufferSize += feedEvents.get(idx).getBytes(StandardCharsets.UTF_8).length;
}
}
final Map<String, List<String>> feedToExpectedEvents = new HashMap<>();
for (final Map.Entry<String, List<String>> expectedEvent : feedToAllEventsBeforeDrop.entrySet()) {
List<String> expectedEvents = expectedEvent.getValue();
feedToExpectedEvents.put(expectedEvent.getKey(), expectedEvents.subList(0, expectedEvents.size() - bufferEventsDrop));
}
final KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
"",
KafkaEmitterConfig.DEFAULT_EVENT_TYPES,
"metrics",
"alerts",
"requests",
"segments",
null,
extraDimensions,
ImmutableMap.of(ProducerConfig.BUFFER_MEMORY_CONFIG, String.valueOf(totalBufferSize)),
null
);
final KafkaEmitter kafkaEmitter = initKafkaEmitter(kafkaEmitterConfig);
final CountDownLatch eventLatch = new CountDownLatch(inputEvents.size() - bufferEventsDrop);
final Map<String, List<String>> feedToActualEvents = trackActualEventsPerFeed(eventLatch);
emitEvents(kafkaEmitter, inputEvents, eventLatch);
validateEvents(feedToExpectedEvents, feedToActualEvents);
Assert.assertEquals(bufferEventsDrop, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
private KafkaEmitter initKafkaEmitter(
final KafkaEmitterConfig kafkaEmitterConfig
)
{
return new KafkaEmitter(
kafkaEmitterConfig,
new DefaultObjectMapper()
)
{
@Override
@ -118,34 +482,103 @@ public class KafkaEmitterTest
return producer;
}
};
}
when(producer.send(any(), any())).then((invocation) -> {
countDownSentEvents.countDown();
return null;
});
private void emitEvents(
final KafkaEmitter kafkaEmitter,
final List<Event> emitEvents,
final CountDownLatch eventLatch
) throws InterruptedException
{
kafkaEmitter.start();
for (Event event : serviceMetricEvents) {
for (final Event event : emitEvents) {
kafkaEmitter.emit(event);
}
for (Event event : alertEvents) {
kafkaEmitter.emit(event);
}
for (Event event : requestLogEvents) {
kafkaEmitter.emit(event);
}
for (Event event : segmentMetadataEvents) {
kafkaEmitter.emit(event);
}
countDownSentEvents.await();
eventLatch.await();
kafkaEmitter.close();
}
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
private List<Event> flattenEvents(List<Event>... eventLists)
{
final List<Event> flattenedList = new ArrayList<>();
for (List<Event> events : eventLists) {
flattenedList.addAll(events);
}
return flattenedList;
}
private Map<String, List<String>> trackActualEventsPerFeed(
final CountDownLatch eventLatch
)
{
final Map<String, List<String>> feedToActualEvents = new HashMap<>();
when(producer.send(any(), any())).then((invocation) -> {
final ProducerRecord<?, ?> producerRecord = invocation.getArgument(0);
final String value = String.valueOf(producerRecord.value());
final EventMap eventMap = MAPPER.readValue(value, EventMap.class);
feedToActualEvents.computeIfAbsent(
(String) eventMap.get("feed"), k -> new ArrayList<>()
).add(value);
eventLatch.countDown();
return null;
});
return feedToActualEvents;
}
private Map<String, List<String>> trackExpectedEventsPerFeed(
final List<Event> events,
final String clusterName,
final Map<String, String> extraDimensions
) throws JsonProcessingException
{
final Map<String, List<String>> feedToExpectedEvents = new HashMap<>();
for (final Event event : events) {
final EventMap eventMap = event.toMap();
eventMap.computeIfAbsent("clusterName", k -> clusterName);
if (extraDimensions != null) {
eventMap.putAll(extraDimensions);
}
feedToExpectedEvents.computeIfAbsent(
event.getFeed(), k -> new ArrayList<>()).add(MAPPER.writeValueAsString(eventMap)
);
}
return feedToExpectedEvents;
}
private void validateEvents(
final Map<String, List<String>> feedToExpectedEvents,
final Map<String, List<String>> feedToActualEvents
)
{
Assert.assertEquals(feedToExpectedEvents.size(), feedToActualEvents.size());
for (final Map.Entry<String, List<String>> actualEntry : feedToActualEvents.entrySet()) {
final String feed = actualEntry.getKey();
final List<String> actualEvents = actualEntry.getValue();
final List<String> expectedEvents = feedToExpectedEvents.get(feed);
Assert.assertEquals(expectedEvents, actualEvents);
}
}
private static class TestEvent implements Event
{
TestEvent()
{
}
@Override
public EventMap toMap()
{
return new EventMap();
}
@Override
public String getFeed()
{
return "testFeed";
}
}
}