Better automatic offset reset for Kinesis ingestion (#15338)

Better automatic offset reset for Kinesis ingestion
This commit is contained in:
AmatyaAvadhanula 2023-12-13 12:03:17 +05:30 committed by GitHub
parent 4ec9a0a7f7
commit 48a96f5d06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 18 deletions

View File

@ -42,12 +42,12 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity> public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
{ {
@ -125,32 +125,46 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
{ {
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) { if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets(); final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
final Map<StreamPartition<String>, String> partitionToSequenceResetMap = new HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) { for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId()); String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) { if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) {
if (task.getTuningConfig().isResetOffsetAutomatically()) { partitionToSequenceResetMap.put(streamPartition, sequence);
log.info("Attempting to reset sequences automatically for all partitions"); }
try { }
sendResetRequestAndWait(
assignment.stream() if (!partitionToSequenceResetMap.isEmpty()) {
.collect(Collectors.toMap(x -> x, x -> currOffsets.get(x.getPartitionId()))), for (Map.Entry<StreamPartition<String>, String> partitionToSequence : partitionToSequenceResetMap.entrySet()) {
toolbox log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s].",
partitionToSequence.getValue(),
partitionToSequence.getKey()
); );
} }
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info(
"Attempting to reset offsets for [%d] partitions with ids[%s].",
partitionToSequenceResetMap.size(),
partitionToSequenceResetMap.keySet()
);
try {
sendResetRequestAndWait(partitionToSequenceResetMap, toolbox);
}
catch (IOException e) { catch (IOException e) {
throw new ISE(e, "Exception while attempting to automatically reset sequences"); throw new ISE(
e,
"Exception while attempting to automatically reset sequences for partitions[%s]",
partitionToSequenceResetMap.keySet()
);
} }
} else { } else {
throw new ISE( throw new ISE(
"Starting sequenceNumber [%s] is no longer available for partition [%s] and resetOffsetAutomatically is not enabled", "Automatic offset reset is disabled, but there are partitions with unavailable sequence numbers [%s].",
sequence, partitionToSequenceResetMap
streamPartition.getPartitionId()
); );
} }
} }
} }
} }
}
@Override @Override
protected boolean isEndOffsetExclusive() protected boolean isEndOffsetExclusive()
@ -191,5 +205,4 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
return null; return null;
} }
} }
} }