fix kafka index task doesn't resume when recieve duplicate request (#6990)

* fix kafka index task doesn't resume when recieve duplicate request

* add unit test
This commit is contained in:
Mingming Qiu 2019-02-13 05:24:28 +08:00 committed by Clint Wylie
parent 8ba11591b6
commit d0abf5c20a
2 changed files with 44 additions and 0 deletions

View File

@ -2127,6 +2127,49 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
public void testRunWithDuplicateRequest() throws Exception
{
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
false
)
);
runTask(task);
while (!task.getRunner().getStatus().equals(Status.READING)) {
Thread.sleep(20);
}
// first setEndOffsets request
task.getRunner().pause();
task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
// duplicate setEndOffsets request
task.getRunner().pause();
task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
}
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {

View File

@ -1403,6 +1403,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusivePartitions) && !finish) ||
(latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers);
resume();
return Response.ok(sequenceNumbers).build();
} else if (latestSequence.isCheckpointed()) {
return Response.status(Response.Status.BAD_REQUEST)