diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 7485cbaab6d..87183d62fe7 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -84,7 +84,9 @@ public class KafkaEmitter implements Emitter this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.scheduler = Executors.newScheduledThreadPool(4); + // need one thread per scheduled task. Scheduled tasks are per eventType and 1 for reporting the lost events + int numOfThreads = config.getEventTypes().size() + 1; + this.scheduler = Executors.newScheduledThreadPool(numOfThreads); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); this.requestLost = new AtomicLong(0L); diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index d6d823c0a88..c7038079aa4 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -30,6 +30,7 @@ import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.kafka.clients.producer.ProducerConfig; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.HashSet; import java.util.Map; @@ -124,6 +125,7 @@ public class KafkaEmitterConfig } @JsonProperty + @Nonnull public Set getEventTypes() { return eventTypes;