diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 67900f61e23..977e6054f19 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -367,8 +367,12 @@ public class KafkaSupervisor implements Supervisor exec.submit( () -> { try { - while (!Thread.currentThread().isInterrupted()) { - final Notice notice = notices.take(); + long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS); + while (!Thread.currentThread().isInterrupted() && !stopped) { + final Notice notice = notices.poll(pollTimeout, TimeUnit.MILLISECONDS); + if (notice == null) { + continue; + } try { notice.handle();