request logs through kafka emitter (#11036)

* request logs through kafka emitter

* travis fixes

* review comments

* kafka emitter unit test

* new line

* travis checks

* checkstyle fix

* count request lost when request topic is null
This commit is contained in:
Parag Jain 2021-04-01 11:31:32 +05:30 committed by GitHub
parent 071b6f5685
commit b35486fa81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 327 additions and 12 deletions

View File

@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|

View File

@ -45,6 +45,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>

View File

@ -21,6 +21,7 @@ package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@ -51,6 +53,7 @@ public class KafkaEmitter implements Emitter
private static final int DEFAULT_RETRIES = 3;
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
private final AtomicLong invalidLost;
private final KafkaEmitterConfig config;
@ -58,6 +61,7 @@ public class KafkaEmitter implements Emitter
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
private final ScheduledExecutorService scheduler;
public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
@ -70,9 +74,11 @@ public class KafkaEmitter implements Emitter
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(3);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}
@ -86,7 +92,8 @@ public class KafkaEmitter implements Emitter
};
}
private Producer<String, String> setKafkaProducer()
@VisibleForTesting
protected Producer<String, String> setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
@ -111,9 +118,13 @@ public class KafkaEmitter implements Emitter
{
scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS);
if (config.getRequestTopic() != null) {
scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
metricLost.get(), alertLost.get(), invalidLost.get());
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get()
);
}, 5, 5, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
}
@ -128,7 +139,13 @@ public class KafkaEmitter implements Emitter
sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
}
private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
private void sendRequestToKafka()
{
sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
}
@VisibleForTesting
protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
try {
@ -166,6 +183,10 @@ public class KafkaEmitter implements Emitter
if (!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
} else {
invalidLost.incrementAndGet();
}
@ -189,4 +210,24 @@ public class KafkaEmitter implements Emitter
scheduler.shutdownNow();
producer.close();
}
public long getMetricLostCount()
{
return metricLost.get();
}
public long getAlertLostCount()
{
return alertLost.get();
}
public long getRequestLostCount()
{
return requestLost.get();
}
public long getInvalidLostCount()
{
return invalidLost.get();
}
}

View File

@ -37,6 +37,8 @@ public class KafkaEmitterConfig
private final String metricTopic;
@JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
private final String requestTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
@ -47,6 +49,7 @@ public class KafkaEmitterConfig
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
@JsonProperty("metric.topic") String metricTopic,
@JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
)
@ -54,6 +57,7 @@ public class KafkaEmitterConfig
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
this.requestTopic = requestTopic;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
}
@ -82,6 +86,12 @@ public class KafkaEmitterConfig
return clusterName;
}
@Nullable
public String getRequestTopic()
{
return requestTopic;
}
@JsonProperty
public Map<String, String> getKafkaProducerConfig()
{
@ -109,6 +119,11 @@ public class KafkaEmitterConfig
if (!getAlertTopic().equals(that.getAlertTopic())) {
return false;
}
if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) {
return false;
}
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}
@ -121,6 +136,7 @@ public class KafkaEmitterConfig
int result = getBootstrapServers().hashCode();
result = 31 * result + getMetricTopic().hashCode();
result = 31 * result + getAlertTopic().hashCode();
result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
return result;
@ -133,6 +149,7 @@ public class KafkaEmitterConfig
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';

View File

@ -43,13 +43,27 @@ public class KafkaEmitterConfigTest
public void testSerDeserKafkaEmitterConfig() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", "clusterNameTest",
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
"alertTest", "requestTest",
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}
@Test
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", null,
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}
@ -57,8 +71,8 @@ public class KafkaEmitterConfigTest
public void testSerDeNotRequiredKafkaProducerConfig()
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
"alertTest", "clusterNameTest",
null
"alertTest", null,
"clusterNameTest", null
);
try {
@SuppressWarnings("unused")

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
@Parameterized.Parameter
public String requestTopic;
@Parameterized.Parameters(name = "{index}: requestTopic - {0}")
public static Object[] data()
{
return new Object[] {
"requests",
null
};
}
// there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds
@Test(timeout = 15_000)
public void testKafkaEmitter() throws InterruptedException
{
final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
ServiceMetricEvent.builder().build("m1", 1).build("service", "host")
);
final List<AlertEvent> alertEvents = ImmutableList.of(
new AlertEvent("service", "host", "description")
);
final List<RequestLogEvent> requestLogEvents = ImmutableList.of(
DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests",
RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()))
).build("service", "host")
);
int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size();
int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();
final CountDownLatch countDownSentEvents = new CountDownLatch(
requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
final KafkaProducer<String, String> producer = EasyMock.createStrictMock(KafkaProducer.class);
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null),
new ObjectMapper()
)
{
@Override
protected Producer<String, String> setKafkaProducer()
{
return producer;
}
@Override
protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue,
Callback callback
)
{
countDownSentEvents.countDown();
super.sendToKafka(topic, recordQueue, callback);
}
};
EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null)
.times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
EasyMock.replay(producer);
kafkaEmitter.start();
for (Event event : serviceMetricEvents) {
kafkaEmitter.emit(event);
}
for (Event event : alertEvents) {
kafkaEmitter.emit(event);
}
for (Event event : requestLogEvents) {
kafkaEmitter.emit(event);
}
countDownSentEvents.await();
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
while (true) {
try {
EasyMock.verify(producer);
break;
}
catch (Throwable e) {
// although the latch may have count down, producer.send may not have been called yet in KafkaEmitter
// so wait for sometime before verifying the mock
Thread.sleep(100);
// just continue
}
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.joda.time.DateTime;
import java.util.HashMap;
import java.util.Map;
/**
@ -56,7 +57,21 @@ public final class DefaultRequestLogEvent implements RequestLogEvent
@Override
public Map<String, Object> toMap()
{
return ImmutableMap.of();
final Map<String, Object> map = new HashMap<>();
map.put("feed", getFeed());
map.put("timestamp", getCreatedTime());
map.put("service", getService());
map.put("host", getHost());
if (getQuery() != null) {
map.put("query", getQuery());
}
if (getSql() != null) {
map.put("sql", getSql());
map.put("sqlQueryContext", getSqlQueryContext());
}
map.put("remoteAddr", getRemoteAddr());
map.put("queryStats", getQueryStats());
return map;
}
@Override

View File

@ -26,15 +26,20 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class DefaultRequestLogEventTest
{
private ObjectMapper objectMapper = new DefaultObjectMapper();
@ -68,4 +73,81 @@ public class DefaultRequestLogEventTest
String expected = "{\"feed\":\"feed\",\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},\"descending\":true,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[],\"postAggregations\":[],\"limit\":5,\"context\":{\"key\":\"value\"}},\"host\":\"127.0.0.1\",\"timestamp\":\"2019-12-12T03:01:00.000Z\",\"service\":\"druid-service\",\"sql\":null,\"sqlQueryContext\":{},\"remoteAddr\":\"127.0.0.1\",\"queryStats\":{\"query/time\":13,\"query/bytes\":10,\"success\":true,\"identity\":\"allowAll\"}}";
Assert.assertEquals(objectMapper.readTree(expected), objectMapper.readTree(logEventJson));
}
@Test
public void testDefaultRequestLogEventToMap()
{
final String feed = "test";
final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
final String service = "druid-service";
final String host = "127.0.0.1";
final Query query = new TimeseriesQuery(
new TableDataSource("dummy"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
true,
VirtualColumns.EMPTY,
null,
Granularities.ALL,
ImmutableList.of(),
ImmutableList.of(),
5,
ImmutableMap.of("key", "value"));
final QueryStats queryStats = new QueryStats(
ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll"));
RequestLogLine nativeLine = RequestLogLine.forNative(
query,
timestamp,
host,
queryStats
);
DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
ImmutableMap.of("service", service, "host", host), feed, nativeLine
);
final Map<String, Object> expected = new HashMap<>();
expected.put("feed", feed);
expected.put("timestamp", timestamp);
expected.put("service", service);
expected.put("host", host);
expected.put("query", query);
expected.put("remoteAddr", host);
expected.put("queryStats", queryStats);
Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
}
@Test
public void testDefaultRequestLogEventToMapSQL()
{
final String feed = "test";
final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
final String service = "druid-service";
final String host = "127.0.0.1";
final String sql = "select * from 1337";
final QueryStats queryStats = new QueryStats(
ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll"));
RequestLogLine nativeLine = RequestLogLine.forSql(
sql,
ImmutableMap.of(),
timestamp,
host,
queryStats
);
DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
ImmutableMap.of("service", service, "host", host), feed, nativeLine
);
final Map<String, Object> expected = new HashMap<>();
expected.put("feed", feed);
expected.put("timestamp", timestamp);
expected.put("service", service);
expected.put("host", host);
expected.put("sql", sql);
expected.put("sqlQueryContext", ImmutableMap.of());
expected.put("remoteAddr", host);
expected.put("queryStats", queryStats);
Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
}
}