Adding Kafka-emitter (#3860)

* Initial commit

* Apply another config: clustername

* Rename variable

* Fix bug

* Add retry logic

* Edit retry logic

* Upgrade kafka-clients version to the most recent release

* Make callback single object

* Write documentation

* Rewrite error message and emit logic

* Handling AlertEvent

* Override toString()

* make clusterName more optional

* bump up druid version

* add producer.config option which make user can apply another optional config value of kafka producer

* remove potential blocking in emit()

* using MemoryBoundLinkedBlockingQueue

* Fixing coding convention

* Remove logging every exception and just increment counting

* refactoring

* trivial modification

* logging when callback has exception

* Replace kafka-clients 0.10.1.1 with 0.10.2.0

* Resolve the problem related of classloader

* adopt try statement

* code reformatting

* make variables final

* rewrite toString
This commit is contained in:
Dongkyu Hwangbo 2017-04-05 06:07:43 +09:00 committed by Fangjin Yang
parent b166e13d2b
commit 0d2e91ed50
10 changed files with 676 additions and 0 deletions

View File

@ -0,0 +1,35 @@
---
layout: doc_page
---
# Kafka Emitter
To use this extension, make sure to [include](../../operations/including-extensions.html) `kafka-emitter` extension.
## Introduction
This extension emits Druid metrics to a [Kafka](https://kafka.apache.org) directly with JSON format.<br>
Currently, Kafka has not only their nice ecosystem but also consumer API readily available.
So, If you currently use Kafka, It's easy to integrate various tool or UI
to monitor the status of your Druid cluster with this extension.
## Configuration
All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.
|property|description|required?|default|
|--------|-----------|---------|-------|
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
### Example
```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```

View File

@ -65,6 +65,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)| |sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
|kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)|
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)| |scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)|

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.10.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>kafka-emitter</artifactId>
<name>kafka-emitter</name>
<description>Druid emitter extension to support kafka</description>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,199 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class KafkaEmitter implements Emitter
{
private static Logger log = new Logger(KafkaEmitter.class);
private final static int DEFAULT_RETRIES = 3;
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong invalidLost;
private final KafkaEmitterConfig config;
private final Producer<String, String> producer;
private final Callback producerCallback;
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final ScheduledExecutorService scheduler;
public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.producer = setKafkaProducer();
this.producerCallback = setProducerCallback();
// same with kafka producer's buffer.memory
long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig()
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(3);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}
private Callback setProducerCallback()
{
return (recordMetadata, e) -> {
if (e != null) {
log.debug("Event send failed [%s]", e.getMessage());
if (recordMetadata.topic().equals(config.getMetricTopic())) {
metricLost.incrementAndGet();
} else if (recordMetadata.topic().equals(config.getAlertTopic())) {
alertLost.incrementAndGet();
} else {
invalidLost.incrementAndGet();
}
}
};
}
private Producer<String, String> setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
props.putAll(config.getKafkaProducerConfig());
return new KafkaProducer<>(props);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
@Override
@LifecycleStart
public void start()
{
scheduler.scheduleWithFixedDelay(this::sendMetricToKafka, 10, 10, TimeUnit.SECONDS);
scheduler.scheduleWithFixedDelay(this::sendAlertToKafka, 10, 10, TimeUnit.SECONDS);
scheduler.scheduleWithFixedDelay(() -> {
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
metricLost.get(), alertLost.get(), invalidLost.get());
}, 5, 5, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
}
private void sendMetricToKafka()
{
sendToKafka(config.getMetricTopic(), metricQueue);
}
private void sendAlertToKafka()
{
sendToKafka(config.getAlertTopic(), alertQueue);
}
private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
{
ObjectContainer<String> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
}
}
catch (InterruptedException e) {
log.warn(e, "Failed to take record from queue!");
}
}
@Override
public void emit(final Event event)
{
if (event != null) {
ImmutableMap.Builder<String, Object> resultBuilder = ImmutableMap.<String, Object>builder().putAll(event.toMap());
if (config.getClusterName() != null) {
resultBuilder.put("clusterName", config.getClusterName());
}
Map<String, Object> result = resultBuilder.build();
try {
String resultJson = jsonMapper.writeValueAsString(result);
ObjectContainer<String> objectContainer = new ObjectContainer<>(resultJson, resultJson.getBytes().length);
if (event instanceof ServiceMetricEvent) {
if (!metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
if (!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else {
invalidLost.incrementAndGet();
}
}
catch (JsonProcessingException e) {
invalidLost.incrementAndGet();
}
}
}
@Override
public void flush() throws IOException
{
producer.flush();
}
@Override
@LifecycleStop
public void close() throws IOException
{
scheduler.shutdownNow();
producer.close();
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.emitter.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Map;
public class KafkaEmitterConfig
{
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
final private String bootstrapServers;
@JsonProperty("metric.topic")
final private String metricTopic;
@JsonProperty("alert.topic")
final private String alertTopic;
@JsonProperty
final private String clusterName;
@JsonProperty("producer.config")
private Map<String, String> kafkaProducerConfig;
@JsonCreator
public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
@JsonProperty("metric.topic") String metricTopic,
@JsonProperty("alert.topic") String alertTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") Map<String, String> kafkaProducerConfig
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig;
}
@JsonProperty
public String getBootstrapServers()
{
return bootstrapServers;
}
@JsonProperty
public String getMetricTopic()
{
return metricTopic;
}
@JsonProperty
public String getAlertTopic()
{
return alertTopic;
}
@JsonProperty
public String getClusterName()
{
return clusterName;
}
@JsonProperty
public Map<String, String> getKafkaProducerConfig()
{
return kafkaProducerConfig;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaEmitterConfig that = (KafkaEmitterConfig) o;
if (!getBootstrapServers().equals(that.getBootstrapServers())) {
return false;
}
if (!getMetricTopic().equals(that.getMetricTopic())) {
return false;
}
if (!getAlertTopic().equals(that.getAlertTopic())) {
return false;
}
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}
return getKafkaProducerConfig() != null
? getKafkaProducerConfig().equals(that.getKafkaProducerConfig())
: that.getKafkaProducerConfig() == null;
}
@Override
public int hashCode()
{
int result = getBootstrapServers().hashCode();
result = 31 * result + getMetricTopic().hashCode();
result = 31 * result + getAlertTopic().hashCode();
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + (getKafkaProducerConfig() != null ? getKafkaProducerConfig().hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "KafkaEmitterConfig{" +
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.emitter.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.metamx.emitter.core.Emitter;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.initialization.DruidModule;
import java.util.Collections;
import java.util.List;
public class KafkaEmitterModule implements DruidModule
{
private static final String EMITTER_TYPE = "kafka";
@Override
public List<? extends Module> getJacksonModules()
{
return Collections.EMPTY_LIST;
}
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, KafkaEmitterConfig.class);
}
@Provides
@ManageLifecycle
@Named(EMITTER_TYPE)
public Emitter getEmitter(KafkaEmitterConfig kafkaEmitterConfig, ObjectMapper mapper)
{
return new KafkaEmitter(kafkaEmitterConfig, mapper);
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.emitter.kafka;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
* Similar to LinkedBlockingQueue but can be bounded by the total byte size of the items present in the queue
* rather than number of items.
*/
public class MemoryBoundLinkedBlockingQueue<T>
{
private final long memoryBound;
private final AtomicLong currentMemory;
private final LinkedBlockingQueue<ObjectContainer<T>> queue;
public MemoryBoundLinkedBlockingQueue(long memoryBound)
{
this.memoryBound = memoryBound;
this.currentMemory = new AtomicLong(0L);
this.queue = new LinkedBlockingQueue<>();
}
// returns true/false depending on whether item was added or not
public boolean offer(ObjectContainer<T> item)
{
final long itemLength = item.getSize();
if (currentMemory.addAndGet(itemLength) <= memoryBound) {
if (queue.offer(item)) {
return true;
}
}
currentMemory.addAndGet(-itemLength);
return false;
}
// blocks until at least one item is available to take
public ObjectContainer<T> take() throws InterruptedException
{
final ObjectContainer<T> ret = queue.take();
currentMemory.addAndGet(-ret.getSize());
return ret;
}
public long getAvailableBuffer()
{
return memoryBound - currentMemory.get();
}
public int size()
{
return queue.size();
}
public static class ObjectContainer<T>
{
private T data;
private long size;
ObjectContainer(T data, long size)
{
this.data = data;
this.size = size;
}
public T getData()
{
return data;
}
public long getSize()
{
return size;
}
}
}

View File

@ -0,0 +1 @@
io.druid.emitter.kafka.KafkaEmitterModule

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.emitter.kafka;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class KafkaEmitterConfigTest
{
private ObjectMapper mapper = new DefaultObjectMapper();
@Before
public void setUp()
{
mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
}
@Test
public void testSerDeserKafkaEmitterConfig() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
"alertTest", "clusterNameTest",
ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}
}

View File

@ -128,6 +128,7 @@
<module>extensions-contrib/ambari-metrics-emitter</module> <module>extensions-contrib/ambari-metrics-emitter</module>
<module>extensions-contrib/scan-query</module> <module>extensions-contrib/scan-query</module>
<module>extensions-contrib/sqlserver-metadata-storage</module> <module>extensions-contrib/sqlserver-metadata-storage</module>
<module>extensions-contrib/kafka-emitter</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>