mirror of https://github.com/apache/druid.git
Fix some flaws of KafkaEmitter (#9573)
* fix flaws of KafkaEmitter * fix flaws of KafkaEmitter * fix flaws of KafkaEmitter * Update extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java Co-Authored-By: Himanshu <g.himanshu@gmail.com> * Update extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java Co-Authored-By: Himanshu <g.himanshu@gmail.com> Co-authored-by: Himanshu <g.himanshu@gmail.com>
This commit is contained in:
parent
65de636893
commit
8ccc0b241a
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.core.Emitter;
|
||||
|
@ -56,7 +55,6 @@ public class KafkaEmitter implements Emitter
|
|||
|
||||
private final KafkaEmitterConfig config;
|
||||
private final Producer<String, String> producer;
|
||||
private final Callback producerCallback;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
|
||||
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
|
||||
|
@ -67,7 +65,6 @@ public class KafkaEmitter implements Emitter
|
|||
this.config = config;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.producer = setKafkaProducer();
|
||||
this.producerCallback = setProducerCallback();
|
||||
// same with kafka producer's buffer.memory
|
||||
long queueMemoryBound = Long.parseLong(this.config.getKafkaProducerConfig()
|
||||
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
|
||||
|
@ -79,18 +76,12 @@ public class KafkaEmitter implements Emitter
|
|||
this.invalidLost = new AtomicLong(0L);
|
||||
}
|
||||
|
||||
private Callback setProducerCallback()
|
||||
private Callback setProducerCallback(AtomicLong lostCouter)
|
||||
{
|
||||
return (recordMetadata, e) -> {
|
||||
if (e != null) {
|
||||
log.debug("Event send failed [%s]", e.getMessage());
|
||||
if (recordMetadata.topic().equals(config.getMetricTopic())) {
|
||||
metricLost.incrementAndGet();
|
||||
} else if (recordMetadata.topic().equals(config.getAlertTopic())) {
|
||||
alertLost.incrementAndGet();
|
||||
} else {
|
||||
invalidLost.incrementAndGet();
|
||||
}
|
||||
lostCouter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -116,11 +107,10 @@ public class KafkaEmitter implements Emitter
|
|||
}
|
||||
|
||||
@Override
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
scheduler.scheduleWithFixedDelay(this::sendMetricToKafka, 10, 10, TimeUnit.SECONDS);
|
||||
scheduler.scheduleWithFixedDelay(this::sendAlertToKafka, 10, 10, TimeUnit.SECONDS);
|
||||
scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS);
|
||||
scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS);
|
||||
scheduler.scheduleWithFixedDelay(() -> {
|
||||
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
|
||||
metricLost.get(), alertLost.get(), invalidLost.get());
|
||||
|
@ -130,25 +120,25 @@ public class KafkaEmitter implements Emitter
|
|||
|
||||
private void sendMetricToKafka()
|
||||
{
|
||||
sendToKafka(config.getMetricTopic(), metricQueue);
|
||||
sendToKafka(config.getMetricTopic(), metricQueue, setProducerCallback(metricLost));
|
||||
}
|
||||
|
||||
private void sendAlertToKafka()
|
||||
{
|
||||
sendToKafka(config.getAlertTopic(), alertQueue);
|
||||
sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
|
||||
}
|
||||
|
||||
private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue)
|
||||
private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
|
||||
{
|
||||
ObjectContainer<String> objectToSend;
|
||||
try {
|
||||
while (true) {
|
||||
objectToSend = recordQueue.take();
|
||||
producer.send(new ProducerRecord<>(topic, objectToSend.getData()), producerCallback);
|
||||
producer.send(new ProducerRecord<>(topic, objectToSend.getData()), callback);
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.warn(e, "Failed to take record from queue!");
|
||||
catch (Throwable e) {
|
||||
log.warn(e, "Exception while getting record from queue or producer send, Events would not be emitted anymore.");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue