diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index e85e6fbb6da..662b8b03bec 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -46,7 +46,7 @@ import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -91,19 +91,19 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd TaskToolbox toolbox ) throws Exception { - // Handles OffsetOutOfRangeException, which is thrown if the seeked-to - // offset is not present in the topic-partition. This can happen if we're asking a task to read from data - // that has not been written yet (which is totally legitimate). So let's wait for it to show up. - List> records = new ArrayList<>(); try { - records = recordSupplier.poll(task.getIOConfig().getPollTimeout()); + return recordSupplier.poll(task.getIOConfig().getPollTimeout()); } catch (OffsetOutOfRangeException e) { + // + // Handles OffsetOutOfRangeException, which is thrown if the seeked-to + // offset is not present in the topic-partition. This can happen if we're asking a task to read from data + // that has not been written yet (which is totally legitimate). So let's wait for it to show up + // log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), recordSupplier, toolbox); + return Collections.emptyList(); } - - return records; } @Override