diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 04d3802a66c..b7b389668a4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2127,6 +2127,49 @@ public class KafkaIndexTaskTest Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 60_000L) + public void testRunWithDuplicateRequest() throws Exception + { + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + false + ) + ); + + runTask(task); + + while (!task.getRunner().getStatus().equals(Status.READING)) { + Thread.sleep(20); + } + + // first setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + // duplicate setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + } + private ListenableFuture runTask(final Task task) { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 857264965f5..c86a2b509c0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1403,6 +1403,7 @@ public abstract class SeekableStreamIndexTaskRunner