Add test to verify sequence name of Kafka task (#15397)

* Add test to verify sequence name of Kafka and Kinesis tasks
This commit is contained in:
Kashif Faraz 2023-11-21 10:17:32 +05:30 committed by GitHub
parent 3fa856b3ff
commit 4ba3cf5221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 178 additions and 9 deletions

View File

@ -3966,7 +3966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
DateTime minMessageTime = DateTimes.nowUtc();
DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
KafkaSupervisor supervisor = getSupervisor(
KafkaSupervisor supervisor = createSupervisor(
2,
1,
true,
@ -4143,7 +4143,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
replayAll();
Assert.assertTrue(supervisor.isTaskCurrent(42, "id0", taskMap));
Assert.assertTrue(supervisor.isTaskCurrent(42, "id1", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id2", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id3", taskMap));
@ -4151,6 +4150,106 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testSequenceNameDoesNotChangeWithTaskId()
{
final DateTime minMessageTime = DateTimes.nowUtc();
final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
KafkaSupervisor supervisor = createSupervisor(
2,
1,
true,
"PT1H",
new Period("P1D"),
new Period("P1D"),
false,
kafkaHost,
dataSchema,
new KafkaSupervisorTuningConfig(
null,
1000,
null,
null,
50000,
null,
new Period("P1Y"),
null,
null,
null,
false,
null,
false,
null,
numThreads,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
null,
null
)
);
// Create task1 with some start and end offsets
final KafkaIndexTask task1 = createKafkaIndexTask(
"id0",
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, 0L, 2, 0L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
minMessageTime,
maxMessageTime,
dataSchema,
supervisor.getTuningConfig()
);
// Create task2 with same offsets
final KafkaIndexTask task2 = createKafkaIndexTask(
"id1",
0,
task1.getIOConfig().getStartSequenceNumbers(),
task1.getIOConfig().getEndSequenceNumbers(),
task1.getIOConfig().getMinimumMessageTime().get(),
task1.getIOConfig().getMaximumMessageTime().get(),
dataSchema,
supervisor.getTuningConfig()
);
replayAll();
final String sequenceTask1 = supervisor.generateSequenceName(
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
task1.getIOConfig().getMinimumMessageTime(),
task1.getIOConfig().getMaximumMessageTime(),
task1.getDataSchema(),
task1.getTuningConfig()
);
Assert.assertNotNull(sequenceTask1);
final String sequenceTask2 = supervisor.generateSequenceName(
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
task2.getIOConfig().getMinimumMessageTime(),
task2.getIOConfig().getMaximumMessageTime(),
task2.getDataSchema(),
task2.getTuningConfig()
);
Assert.assertNotNull(sequenceTask2);
Assert.assertNotEquals(task1.getId(), task2.getId());
Assert.assertEquals(sequenceTask1, sequenceTask2);
verifyAll();
}
@Test
public void testResumeAllActivelyReadingTasks() throws Exception
{
@ -4711,8 +4810,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
/**
* Use when you don't want generateSequenceNumber overridden
*/
private KafkaSupervisor getSupervisor(
private KafkaSupervisor createSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
@ -4992,7 +5090,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Override
protected String generateSequenceName(
public String generateSequenceName(
Map<KafkaTopicPartition, Long> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,

View File

@ -3922,7 +3922,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
DateTime minMessageTime = DateTimes.nowUtc();
DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
KinesisSupervisor supervisor = getSupervisor(
KinesisSupervisor supervisor = createSupervisor(
1,
1,
true,
@ -4066,6 +4066,77 @@ public class KinesisSupervisorTest extends EasyMockSupport
verifyAll();
}
@Test
public void testSequenceNameDoesNotChangeWithTaskId()
{
final DateTime minMessageTime = DateTimes.nowUtc();
final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
KinesisSupervisor supervisor = createSupervisor(
1,
1,
true,
"PT1H",
new Period("P1D"),
new Period("P1D"),
false,
42,
42,
dataSchema,
tuningConfig
);
// Create task1 with some start and end offsets
final KinesisIndexTask task1 = createKinesisIndexTask(
"id0",
0,
new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "3"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
),
minMessageTime,
maxMessageTime,
dataSchema
);
// Create task2 with same offsets
final KinesisIndexTask task2 = createKinesisIndexTask(
"id1",
0,
task1.getIOConfig().getStartSequenceNumbers(),
task1.getIOConfig().getEndSequenceNumbers(),
task1.getIOConfig().getMinimumMessageTime().get(),
task1.getIOConfig().getMaximumMessageTime().get(),
dataSchema
);
replayAll();
final String sequenceTask1 = supervisor.generateSequenceName(
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
task1.getIOConfig().getMinimumMessageTime(),
task1.getIOConfig().getMaximumMessageTime(),
task1.getDataSchema(),
task1.getTuningConfig()
);
Assert.assertNotNull(sequenceTask1);
final String sequenceTask2 = supervisor.generateSequenceName(
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
task2.getIOConfig().getMinimumMessageTime(),
task2.getIOConfig().getMaximumMessageTime(),
task2.getDataSchema(),
task2.getTuningConfig()
);
Assert.assertNotNull(sequenceTask2);
Assert.assertNotEquals(task1.getId(), task2.getId());
Assert.assertEquals(sequenceTask1, sequenceTask2);
verifyAll();
}
@Test
public void testShardSplit() throws Exception
{
@ -5317,7 +5388,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
/**
* Use for tests where you don't want generateSequenceName to be overridden out
*/
private KinesisSupervisor getSupervisor(
private KinesisSupervisor createSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
@ -5572,7 +5643,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Override
protected String generateSequenceName(
public String generateSequenceName(
Map<String, String> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,

View File

@ -2608,7 +2608,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@VisibleForTesting
protected String generateSequenceName(
public String generateSequenceName(
Map<PartitionIdType, SequenceOffsetType> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,