diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2ff8c5bf91e..3087bdcbb1a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3966,7 +3966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport DateTime minMessageTime = DateTimes.nowUtc(); DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); - KafkaSupervisor supervisor = getSupervisor( + KafkaSupervisor supervisor = createSupervisor( 2, 1, true, @@ -4143,7 +4143,6 @@ public class KafkaSupervisorTest extends EasyMockSupport replayAll(); Assert.assertTrue(supervisor.isTaskCurrent(42, "id0", taskMap)); - Assert.assertTrue(supervisor.isTaskCurrent(42, "id1", taskMap)); Assert.assertFalse(supervisor.isTaskCurrent(42, "id2", taskMap)); Assert.assertFalse(supervisor.isTaskCurrent(42, "id3", taskMap)); @@ -4151,6 +4150,106 @@ public class KafkaSupervisorTest extends EasyMockSupport verifyAll(); } + @Test + public void testSequenceNameDoesNotChangeWithTaskId() + { + final DateTime minMessageTime = DateTimes.nowUtc(); + final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); + + KafkaSupervisor supervisor = createSupervisor( + 2, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + kafkaHost, + dataSchema, + new KafkaSupervisorTuningConfig( + null, + 1000, + null, + null, + 50000, + null, + new Period("P1Y"), + null, + null, + null, + false, + null, + false, + null, + numThreads, + TEST_CHAT_RETRIES, + TEST_HTTP_TIMEOUT, + TEST_SHUTDOWN_TIMEOUT, + null, + null, + null, + null, + null + ) + ); + + // Create task1 with some start and end offsets + final KafkaIndexTask task1 = createKafkaIndexTask( + "id0", + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + minMessageTime, + maxMessageTime, + dataSchema, + supervisor.getTuningConfig() + ); + + // Create task2 with same offsets + final KafkaIndexTask task2 = createKafkaIndexTask( + "id1", + 0, + task1.getIOConfig().getStartSequenceNumbers(), + task1.getIOConfig().getEndSequenceNumbers(), + task1.getIOConfig().getMinimumMessageTime().get(), + task1.getIOConfig().getMaximumMessageTime().get(), + dataSchema, + supervisor.getTuningConfig() + ); + + replayAll(); + + final String sequenceTask1 = supervisor.generateSequenceName( + task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), + task1.getIOConfig().getMinimumMessageTime(), + task1.getIOConfig().getMaximumMessageTime(), + task1.getDataSchema(), + task1.getTuningConfig() + ); + Assert.assertNotNull(sequenceTask1); + + final String sequenceTask2 = supervisor.generateSequenceName( + task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), + task2.getIOConfig().getMinimumMessageTime(), + task2.getIOConfig().getMaximumMessageTime(), + task2.getDataSchema(), + task2.getTuningConfig() + ); + Assert.assertNotNull(sequenceTask2); + + Assert.assertNotEquals(task1.getId(), task2.getId()); + Assert.assertEquals(sequenceTask1, sequenceTask2); + + verifyAll(); + } + @Test public void testResumeAllActivelyReadingTasks() throws Exception { @@ -4711,8 +4810,7 @@ public class KafkaSupervisorTest extends EasyMockSupport /** * Use when you don't want generateSequenceNumber overridden */ - - private KafkaSupervisor getSupervisor( + private KafkaSupervisor createSupervisor( int replicas, int taskCount, boolean useEarliestOffset, @@ -4992,7 +5090,7 @@ public class KafkaSupervisorTest extends EasyMockSupport } @Override - protected String generateSequenceName( + public String generateSequenceName( Map startPartitions, Optional minimumMessageTime, Optional maximumMessageTime, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 39d943dbebe..303eb5eff38 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3922,7 +3922,7 @@ public class KinesisSupervisorTest extends EasyMockSupport DateTime minMessageTime = DateTimes.nowUtc(); DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); - KinesisSupervisor supervisor = getSupervisor( + KinesisSupervisor supervisor = createSupervisor( 1, 1, true, @@ -4066,6 +4066,77 @@ public class KinesisSupervisorTest extends EasyMockSupport verifyAll(); } + @Test + public void testSequenceNameDoesNotChangeWithTaskId() + { + final DateTime minMessageTime = DateTimes.nowUtc(); + final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000); + + KinesisSupervisor supervisor = createSupervisor( + 1, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + 42, + 42, + dataSchema, + tuningConfig + ); + + // Create task1 with some start and end offsets + final KinesisIndexTask task1 = createKinesisIndexTask( + "id0", + 0, + new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "3"), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>( + "stream", + ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER) + ), + minMessageTime, + maxMessageTime, + dataSchema + ); + + // Create task2 with same offsets + final KinesisIndexTask task2 = createKinesisIndexTask( + "id1", + 0, + task1.getIOConfig().getStartSequenceNumbers(), + task1.getIOConfig().getEndSequenceNumbers(), + task1.getIOConfig().getMinimumMessageTime().get(), + task1.getIOConfig().getMaximumMessageTime().get(), + dataSchema + ); + + replayAll(); + + final String sequenceTask1 = supervisor.generateSequenceName( + task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), + task1.getIOConfig().getMinimumMessageTime(), + task1.getIOConfig().getMaximumMessageTime(), + task1.getDataSchema(), + task1.getTuningConfig() + ); + Assert.assertNotNull(sequenceTask1); + + final String sequenceTask2 = supervisor.generateSequenceName( + task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), + task2.getIOConfig().getMinimumMessageTime(), + task2.getIOConfig().getMaximumMessageTime(), + task2.getDataSchema(), + task2.getTuningConfig() + ); + Assert.assertNotNull(sequenceTask2); + + Assert.assertNotEquals(task1.getId(), task2.getId()); + Assert.assertEquals(sequenceTask1, sequenceTask2); + + verifyAll(); + } + @Test public void testShardSplit() throws Exception { @@ -5317,7 +5388,7 @@ public class KinesisSupervisorTest extends EasyMockSupport /** * Use for tests where you don't want generateSequenceName to be overridden out */ - private KinesisSupervisor getSupervisor( + private KinesisSupervisor createSupervisor( int replicas, int taskCount, boolean useEarliestOffset, @@ -5572,7 +5643,7 @@ public class KinesisSupervisorTest extends EasyMockSupport } @Override - protected String generateSequenceName( + public String generateSequenceName( Map startPartitions, Optional minimumMessageTime, Optional maximumMessageTime, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index a0ec6d809eb..906f7155665 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2608,7 +2608,7 @@ public abstract class SeekableStreamSupervisor startPartitions, Optional minimumMessageTime, Optional maximumMessageTime,