diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 769413d6ffc..9fc743bd23b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -213,6 +213,7 @@ public abstract class SeekableStreamIndexTaskRunner publishingSequences = Sets.newConcurrentHashSet(); + private final Set publishedSequences = Sets.newConcurrentHashSet(); private final List> publishWaitList = new ArrayList<>(); private final List> handOffWaitList = new ArrayList<>(); @@ -806,7 +807,8 @@ public abstract class SeekableStreamIndexTaskRunner> sequencesSnapshot = new ArrayList<>(sequences); for (int i = 0; i < sequencesSnapshot.size(); i++) { final SequenceMetadata sequenceMetadata = sequencesSnapshot.get(i); - if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) { + if (!publishingSequences.contains(sequenceMetadata.getSequenceName()) + && !publishedSequences.contains(sequenceMetadata.getSequenceName())) { final boolean isLast = i == (sequencesSnapshot.size() - 1); if (isLast) { // Shorten endOffsets of the last sequence to match currOffsets. @@ -1009,6 +1011,7 @@ public abstract class SeekableStreamIndexTaskRunner sequenceMetadata : sequences) { sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadBeforeReadingRecord); - if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) { + if (!sequenceMetadata.isOpen() + && !publishingSequences.contains(sequenceMetadata.getSequenceName()) + && !publishedSequences.contains(sequenceMetadata.getSequenceName())) { publishingSequences.add(sequenceMetadata.getSequenceName()); try { final Object result = driver.persist(committerSupplier.get());