kafkasupervisor checkpointing bug (#6639)

This commit is contained in:
Joshua Sun 2018-11-16 16:09:55 -08:00 committed by Gian Merlino
parent c4cb4b4909
commit 7928e81e3f
1 changed files with 15 additions and 12 deletions

View File

@ -669,18 +669,21 @@ public class KafkaSupervisor implements Supervisor
})
.findAny()
.map(Entry::getKey);
taskGroupId = maybeGroupId.orElse(
pendingCompletionTaskGroups
.entrySet()
.stream()
.filter(entry -> {
final List<TaskGroup> taskGroups = entry.getValue();
return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
})
.findAny()
.orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
.getKey()
);
if (maybeGroupId.isPresent()) {
taskGroupId = maybeGroupId.get();
} else {
taskGroupId = pendingCompletionTaskGroups
.entrySet()
.stream()
.filter(entry -> {
final List<TaskGroup> taskGroups = entry.getValue();
return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName));
})
.findAny()
.orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName))
.getKey();
}
} else {
taskGroupId = nullableTaskGroupId;
}