From 48a96f5d06bf4b4d1532b0ec96eb631b4287e30d Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Wed, 13 Dec 2023 12:03:17 +0530 Subject: [PATCH] Better automatic offset reset for Kinesis ingestion (#15338) Better automatic offset reset for Kinesis ingestion --- .../kinesis/KinesisIndexTaskRunner.java | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 22448ba67a0..75f23da0e1f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -42,12 +42,12 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner { @@ -125,28 +125,42 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner currOffsets = getCurrentOffsets(); + final Map, String> partitionToSequenceResetMap = new HashMap<>(); for (final StreamPartition streamPartition : assignment) { String sequence = currOffsets.get(streamPartition.getPartitionId()); if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) { - if (task.getTuningConfig().isResetOffsetAutomatically()) { - log.info("Attempting to reset sequences automatically for all partitions"); - try { - sendResetRequestAndWait( - assignment.stream() - .collect(Collectors.toMap(x -> x, x -> currOffsets.get(x.getPartitionId()))), - toolbox - ); - } - catch (IOException e) { - throw new ISE(e, "Exception while attempting to automatically reset sequences"); - } - } else { + partitionToSequenceResetMap.put(streamPartition, sequence); + } + } + + if (!partitionToSequenceResetMap.isEmpty()) { + for (Map.Entry, String> partitionToSequence : partitionToSequenceResetMap.entrySet()) { + log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s].", + partitionToSequence.getValue(), + partitionToSequence.getKey() + ); + } + if (task.getTuningConfig().isResetOffsetAutomatically()) { + log.info( + "Attempting to reset offsets for [%d] partitions with ids[%s].", + partitionToSequenceResetMap.size(), + partitionToSequenceResetMap.keySet() + ); + try { + sendResetRequestAndWait(partitionToSequenceResetMap, toolbox); + } + catch (IOException e) { throw new ISE( - "Starting sequenceNumber [%s] is no longer available for partition [%s] and resetOffsetAutomatically is not enabled", - sequence, - streamPartition.getPartitionId() + e, + "Exception while attempting to automatically reset sequences for partitions[%s]", + partitionToSequenceResetMap.keySet() ); } + } else { + throw new ISE( + "Automatic offset reset is disabled, but there are partitions with unavailable sequence numbers [%s].", + partitionToSequenceResetMap + ); } } } @@ -191,5 +205,4 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner