From 18d42cae3ff9ebe5f255ed356864793b20c034c1 Mon Sep 17 00:00:00 2001 From: Tom Date: Thu, 18 Jan 2024 13:27:40 -0800 Subject: [PATCH] Kafka emitter wasn't given the correct number of threads. It should be 1 thread per scheduled task. (#15719) This change intelligently provisions the correct number of threads per scheduled task. 1 for each event type, and 1 for logging the lost events. This is a change to make this work. But in the future it would be worthwhile to make each task not be greedy and share threads so there isn't a need of a thread per task. --- .../java/org/apache/druid/emitter/kafka/KafkaEmitter.java | 4 +++- .../org/apache/druid/emitter/kafka/KafkaEmitterConfig.java | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 7485cbaab6d..87183d62fe7 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -84,7 +84,9 @@ public class KafkaEmitter implements Emitter this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.scheduler = Executors.newScheduledThreadPool(4); + // need one thread per scheduled task. Scheduled tasks are per eventType and 1 for reporting the lost events + int numOfThreads = config.getEventTypes().size() + 1; + this.scheduler = Executors.newScheduledThreadPool(numOfThreads); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); this.requestLost = new AtomicLong(0L); diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index d6d823c0a88..c7038079aa4 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -30,6 +30,7 @@ import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.kafka.clients.producer.ProducerConfig; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.HashSet; import java.util.Map; @@ -124,6 +125,7 @@ public class KafkaEmitterConfig } @JsonProperty + @Nonnull public Set getEventTypes() { return eventTypes;