diff --git a/docs/content/development/extensions-contrib/kafka-emitter.md b/docs/content/development/extensions-contrib/kafka-emitter.md new file mode 100644 index 00000000000..fb1371db5c7 --- /dev/null +++ b/docs/content/development/extensions-contrib/kafka-emitter.md @@ -0,0 +1,35 @@ +--- +layout: doc_page +--- + +# Kafka Emitter + +To use this extension, make sure to [include](../../operations/including-extensions.html) `kafka-emitter` extension. + +## Introduction + +This extension emits Druid metrics to a [Kafka](https://kafka.apache.org) directly with JSON format.
+Currently, Kafka has not only their nice ecosystem but also consumer API readily available. +So, If you currently use Kafka, It's easy to integrate various tool or UI +to monitor the status of your Druid cluster with this extension. + +## Configuration + +All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`. + +|property|description|required?|default| +|--------|-----------|---------|-------| +|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| +|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| +|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| +|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| +|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| + +### Example + +``` +druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092 +druid.emitter.kafka.metric.topic=druid-metric +druid.emitter.kafka.alert.topic=druid-alert +druid.emitter.kafka.producer.config={"max.block.ms":10000} +``` diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 45c116d22b0..779ea6b65d9 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -65,6 +65,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| +|kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| |scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)| diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml new file mode 100644 index 00000000000..ed74ca02f56 --- /dev/null +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -0,0 +1,91 @@ + + + + + 4.0.0 + + + io.druid + druid + 0.10.0-SNAPSHOT + ../../pom.xml + + + io.druid.extensions.contrib + kafka-emitter + kafka-emitter + Druid emitter extension to support kafka + + + + org.apache.kafka + kafka-clients + 0.10.2.0 + + + io.druid + druid-common + ${project.parent.version} + provided + + + io.druid + druid-api + ${project.parent.version} + provided + + + com.metamx + emitter + provided + + + junit + junit + test + + + org.easymock + easymock + test + + + pl.pragmatists + JUnitParams + 1.0.4 + test + + + io.druid + druid-server + ${project.parent.version} + test-jar + test + + + io.druid + druid-processing + ${project.parent.version} + test-jar + test + + + diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java new file mode 100644 index 00000000000..74391bd1ba6 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitter.java @@ -0,0 +1,199 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.metamx.emitter.core.Emitter; +import com.metamx.emitter.core.Event; +import com.metamx.emitter.service.AlertEvent; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.common.logger.Logger; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class KafkaEmitter implements Emitter +{ + private static Logger log = new Logger(KafkaEmitter.class); + + private final static int DEFAULT_RETRIES = 3; + private final AtomicLong metricLost; + private final AtomicLong alertLost; + private final AtomicLong invalidLost; + + private final KafkaEmitterConfig config; + private final Producer producer; + private final Callback producerCallback; + private final ObjectMapper jsonMapper; + private final MemoryBoundLinkedBlockingQueue metricQueue; + private final MemoryBoundLinkedBlockingQueue alertQueue; + private final ScheduledExecutorService scheduler; + + public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) + { + this.config = config; + this.jsonMapper = jsonMapper; + this.producer = setKafkaProducer(); + this.producerCallback = setProducerCallback(); + // same with kafka producer's buffer.memory + long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig() + .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")); + this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.scheduler = Executors.newScheduledThreadPool(3); + this.metricLost = new AtomicLong(0L); + this.alertLost = new AtomicLong(0L); + this.invalidLost = new AtomicLong(0L); + } + + private Callback setProducerCallback() + { + return (recordMetadata, e) -> { + if (e != null) { + log.debug("Event send failed [%s]", e.getMessage()); + if (recordMetadata.topic().equals(config.getMetricTopic())) { + metricLost.incrementAndGet(); + } else if (recordMetadata.topic().equals(config.getAlertTopic())) { + alertLost.incrementAndGet(); + } else { + invalidLost.incrementAndGet(); + } + } + }; + } + + private Producer setKafkaProducer() + { + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES); + props.putAll(config.getKafkaProducerConfig()); + + return new KafkaProducer<>(props); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + + @Override + @LifecycleStart + public void start() + { + scheduler.scheduleWithFixedDelay(this::sendMetricToKafka, 10, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::sendAlertToKafka, 10, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(() -> { + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", + metricLost.get(), alertLost.get(), invalidLost.get()); + }, 5, 5, TimeUnit.MINUTES); + log.info("Starting Kafka Emitter."); + } + + private void sendMetricToKafka() + { + sendToKafka(config.getMetricTopic(), metricQueue); + } + + private void sendAlertToKafka() + { + sendToKafka(config.getAlertTopic(), alertQueue); + } + + private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue) + { + ObjectContainer objectToSend; + try { + while (true) { + objectToSend = recordQueue.take(); + producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback); + } + } + catch (InterruptedException e) { + log.warn(e, "Failed to take record from queue!"); + } + } + + @Override + public void emit(final Event event) + { + if (event != null) { + ImmutableMap.Builder resultBuilder = ImmutableMap.builder().putAll(event.toMap()); + if (config.getClusterName() != null) { + resultBuilder.put("clusterName", config.getClusterName()); + } + Map result = resultBuilder.build(); + + try { + String resultJson = jsonMapper.writeValueAsString(result); + ObjectContainer objectContainer = new ObjectContainer<>(resultJson, resultJson.getBytes().length); + if (event instanceof ServiceMetricEvent) { + if (!metricQueue.offer(objectContainer)) { + metricLost.incrementAndGet(); + } + } else if (event instanceof AlertEvent) { + if (!alertQueue.offer(objectContainer)) { + alertLost.incrementAndGet(); + } + } else { + invalidLost.incrementAndGet(); + } + } + catch (JsonProcessingException e) { + invalidLost.incrementAndGet(); + } + } + } + + @Override + public void flush() throws IOException + { + producer.flush(); + } + + @Override + @LifecycleStop + public void close() throws IOException + { + scheduler.shutdownNow(); + producer.close(); + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java new file mode 100644 index 00000000000..b5ca3ce096d --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterConfig.java @@ -0,0 +1,140 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Map; + +public class KafkaEmitterConfig +{ + + @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + final private String bootstrapServers; + @JsonProperty("metric.topic") + final private String metricTopic; + @JsonProperty("alert.topic") + final private String alertTopic; + @JsonProperty + final private String clusterName; + @JsonProperty("producer.config") + private Map kafkaProducerConfig; + + @JsonCreator + public KafkaEmitterConfig( + @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, + @JsonProperty("metric.topic") String metricTopic, + @JsonProperty("alert.topic") String alertTopic, + @JsonProperty("clusterName") String clusterName, + @JsonProperty("producer.config") Map kafkaProducerConfig + ) + { + this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); + this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); + this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); + this.clusterName = clusterName; + this.kafkaProducerConfig = kafkaProducerConfig; + } + + @JsonProperty + public String getBootstrapServers() + { + return bootstrapServers; + } + + @JsonProperty + public String getMetricTopic() + { + return metricTopic; + } + + @JsonProperty + public String getAlertTopic() + { + return alertTopic; + } + + @JsonProperty + public String getClusterName() + { + return clusterName; + } + + @JsonProperty + public Map getKafkaProducerConfig() + { + return kafkaProducerConfig; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + KafkaEmitterConfig that = (KafkaEmitterConfig) o; + + if (!getBootstrapServers().equals(that.getBootstrapServers())) { + return false; + } + if (!getMetricTopic().equals(that.getMetricTopic())) { + return false; + } + if (!getAlertTopic().equals(that.getAlertTopic())) { + return false; + } + if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { + return false; + } + return getKafkaProducerConfig() != null + ? getKafkaProducerConfig().equals(that.getKafkaProducerConfig()) + : that.getKafkaProducerConfig() == null; + } + + @Override + public int hashCode() + { + int result = getBootstrapServers().hashCode(); + result = 31 * result + getMetricTopic().hashCode(); + result = 31 * result + getAlertTopic().hashCode(); + result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); + result = 31 * result + (getKafkaProducerConfig() != null ? getKafkaProducerConfig().hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "KafkaEmitterConfig{" + + "bootstrap.servers='" + bootstrapServers + '\'' + + ", metric.topic='" + metricTopic + '\'' + + ", alert.topic='" + alertTopic + '\'' + + ", clusterName='" + clusterName + '\'' + + ", Producer.config=" + kafkaProducerConfig + + '}'; + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java new file mode 100644 index 00000000000..0608b1f0d89 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/KafkaEmitterModule.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.name.Named; +import com.metamx.emitter.core.Emitter; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class KafkaEmitterModule implements DruidModule +{ + private static final String EMITTER_TYPE = "kafka"; + + @Override + public List getJacksonModules() + { + return Collections.EMPTY_LIST; + } + + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, KafkaEmitterConfig.class); + } + + @Provides + @ManageLifecycle + @Named(EMITTER_TYPE) + public Emitter getEmitter(KafkaEmitterConfig kafkaEmitterConfig, ObjectMapper mapper) + { + return new KafkaEmitter(kafkaEmitterConfig, mapper); + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java new file mode 100644 index 00000000000..b12d2a75d16 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/java/io/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Similar to LinkedBlockingQueue but can be bounded by the total byte size of the items present in the queue + * rather than number of items. + */ +public class MemoryBoundLinkedBlockingQueue +{ + private final long memoryBound; + private final AtomicLong currentMemory; + private final LinkedBlockingQueue> queue; + + public MemoryBoundLinkedBlockingQueue(long memoryBound) + { + this.memoryBound = memoryBound; + this.currentMemory = new AtomicLong(0L); + this.queue = new LinkedBlockingQueue<>(); + } + + // returns true/false depending on whether item was added or not + public boolean offer(ObjectContainer item) + { + final long itemLength = item.getSize(); + + if (currentMemory.addAndGet(itemLength) <= memoryBound) { + if (queue.offer(item)) { + return true; + } + } + currentMemory.addAndGet(-itemLength); + return false; + } + + // blocks until at least one item is available to take + public ObjectContainer take() throws InterruptedException + { + final ObjectContainer ret = queue.take(); + currentMemory.addAndGet(-ret.getSize()); + return ret; + } + + public long getAvailableBuffer() + { + return memoryBound - currentMemory.get(); + } + + public int size() + { + return queue.size(); + } + + public static class ObjectContainer + { + private T data; + private long size; + + ObjectContainer(T data, long size) + { + this.data = data; + this.size = size; + } + + public T getData() + { + return data; + } + + public long getSize() + { + return size; + } + } +} diff --git a/extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..7fccf54ec6c --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.emitter.kafka.KafkaEmitterModule \ No newline at end of file diff --git a/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java new file mode 100644 index 00000000000..37a8c113348 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/test/java/io/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.emitter.kafka; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class KafkaEmitterConfigTest +{ + private ObjectMapper mapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + mapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper())); + } + + @Test + public void testSerDeserKafkaEmitterConfig() throws IOException + { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", + "alertTest", "clusterNameTest", + ImmutableMap.builder() + .put("testKey", "testValue").build() + ); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.reader(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } +} diff --git a/pom.xml b/pom.xml index 631260b0506..0c197cac5b7 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ extensions-contrib/ambari-metrics-emitter extensions-contrib/scan-query extensions-contrib/sqlserver-metadata-storage + extensions-contrib/kafka-emitter