Check the kafka topic when compacring checkpoints from tasks and the one stored in metastore (#6015)

This commit is contained in:
Jihoon Son 2018-07-20 11:20:23 -07:00 committed by Fangjin Yang
parent 414487a78e
commit b7d42edb0f
1 changed files with 32 additions and 25 deletions

View File

@ -514,9 +514,7 @@ public class KafkaSupervisor implements Supervisor
Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
Preconditions.checkArgument( Preconditions.checkArgument(
ioConfig.getTopic() ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()),
.equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions()
.getTopic()),
"Supervisor topic [%s] and topic in checkpoint [%s] does not match", "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
ioConfig.getTopic(), ioConfig.getTopic(),
((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
@ -661,6 +659,8 @@ public class KafkaSupervisor implements Supervisor
int index = checkpoints.size(); int index = checkpoints.size();
for (int sequenceId : checkpoints.descendingKeySet()) { for (int sequenceId : checkpoints.descendingKeySet()) {
Map<Integer, Long> checkpoint = checkpoints.get(sequenceId); Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
// We have already verified the topic of the current checkpoint is same with that in ioConfig.
// See checkpoint().
if (checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) { if (checkpoint.equals(previousCheckpoint.getKafkaPartitions().getPartitionOffsetMap())) {
break; break;
} }
@ -1183,16 +1183,22 @@ public class KafkaSupervisor implements Supervisor
Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); Futures.allAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
} }
catch (Exception e) { catch (Exception e) {
Throwables.propagate(e); throw new RuntimeException(e);
} }
final KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata) indexerMetadataStorageCoordinator final KafkaDataSourceMetadata latestDataSourceMetadata = (KafkaDataSourceMetadata) indexerMetadataStorageCoordinator
.getDataSourceMetadata(dataSource); .getDataSourceMetadata(dataSource);
final Map<Integer, Long> latestOffsetsFromDb = (latestDataSourceMetadata == null final boolean hasValidOffsetsFromDb = latestDataSourceMetadata != null &&
|| latestDataSourceMetadata.getKafkaPartitions() == null) ? null latestDataSourceMetadata.getKafkaPartitions() != null &&
: latestDataSourceMetadata ioConfig.getTopic().equals(
.getKafkaPartitions() latestDataSourceMetadata.getKafkaPartitions().getTopic()
.getPartitionOffsetMap(); );
final Map<Integer, Long> latestOffsetsFromDb;
if (hasValidOffsetsFromDb) {
latestOffsetsFromDb = latestDataSourceMetadata.getKafkaPartitions().getPartitionOffsetMap();
} else {
latestOffsetsFromDb = null;
}
// order tasks of this taskGroup by the latest sequenceId // order tasks of this taskGroup by the latest sequenceId
taskSequences.sort((o1, o2) -> o2.rhs.firstKey().compareTo(o1.rhs.firstKey())); taskSequences.sort((o1, o2) -> o2.rhs.firstKey().compareTo(o1.rhs.firstKey()));
@ -1203,22 +1209,21 @@ public class KafkaSupervisor implements Supervisor
while (taskIndex < taskSequences.size()) { while (taskIndex < taskSequences.size()) {
if (earliestConsistentSequenceId.get() == -1) { if (earliestConsistentSequenceId.get() == -1) {
// find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store // find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata
// store
if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch( if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch(
sequenceCheckpoint -> sequenceCheckpoint.getValue().entrySet().stream().allMatch( sequenceCheckpoint -> sequenceCheckpoint.getValue().entrySet().stream().allMatch(
partitionOffset -> Longs.compare( partitionOffset -> Longs.compare(
partitionOffset.getValue(), partitionOffset.getValue(),
latestOffsetsFromDb == null latestOffsetsFromDb == null ?
? partitionOffset.getValue() :
partitionOffset.getValue() latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
: latestOffsetsFromDb.getOrDefault(partitionOffset.getKey(), partitionOffset.getValue())
) == 0) && earliestConsistentSequenceId.compareAndSet(-1, sequenceCheckpoint.getKey())) || ( ) == 0) && earliestConsistentSequenceId.compareAndSet(-1, sequenceCheckpoint.getKey())) || (
pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0 pendingCompletionTaskGroups.getOrDefault(groupId, EMPTY_LIST).size() > 0
&& earliestConsistentSequenceId.compareAndSet(-1, taskSequences.get(taskIndex).rhs.firstKey()))) { && earliestConsistentSequenceId.compareAndSet(-1, taskSequences.get(taskIndex).rhs.firstKey()))) {
final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(taskSequences.get(taskIndex).rhs final SortedMap<Integer, Map<Integer, Long>> latestCheckpoints = new TreeMap<>(
.tailMap( taskSequences.get(taskIndex).rhs.tailMap(earliestConsistentSequenceId.get())
earliestConsistentSequenceId );
.get()));
log.info("Setting taskGroup sequences to [%s] for group [%d]", latestCheckpoints, groupId); log.info("Setting taskGroup sequences to [%s] for group [%d]", latestCheckpoints, groupId);
taskGroup.sequenceOffsets.clear(); taskGroup.sequenceOffsets.clear();
taskGroup.sequenceOffsets.putAll(latestCheckpoints); taskGroup.sequenceOffsets.putAll(latestCheckpoints);
@ -1262,7 +1267,8 @@ public class KafkaSupervisor implements Supervisor
taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach( taskSequences.stream().filter(taskIdSequences -> tasksToKill.contains(taskIdSequences.lhs)).forEach(
sequenceCheckpoint -> { sequenceCheckpoint -> {
log.warn( log.warn(
"Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted offsets in metadata store [%s]", "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest "
+ "persisted offsets in metadata store [%s]",
sequenceCheckpoint.lhs, sequenceCheckpoint.lhs,
sequenceCheckpoint.rhs, sequenceCheckpoint.rhs,
taskGroup.sequenceOffsets, taskGroup.sequenceOffsets,
@ -1270,7 +1276,8 @@ public class KafkaSupervisor implements Supervisor
); );
killTask(sequenceCheckpoint.lhs); killTask(sequenceCheckpoint.lhs);
taskGroup.tasks.remove(sequenceCheckpoint.lhs); taskGroup.tasks.remove(sequenceCheckpoint.lhs);
}); }
);
} }
private void addDiscoveredTaskToPendingCompletionTaskGroups( private void addDiscoveredTaskToPendingCompletionTaskGroups(
@ -1849,7 +1856,7 @@ public class KafkaSupervisor implements Supervisor
private long getOffsetFromStorageForPartition(int partition) private long getOffsetFromStorageForPartition(int partition)
{ {
long offset; long offset;
Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage(); final Map<Integer, Long> metadataOffsets = getOffsetsFromMetadataStorage();
if (metadataOffsets.get(partition) != null) { if (metadataOffsets.get(partition) != null) {
offset = metadataOffsets.get(partition); offset = metadataOffsets.get(partition);
log.debug("Getting offset [%,d] from metadata storage for partition [%d]", offset, partition); log.debug("Getting offset [%,d] from metadata storage for partition [%d]", offset, partition);
@ -1877,8 +1884,8 @@ public class KafkaSupervisor implements Supervisor
private Map<Integer, Long> getOffsetsFromMetadataStorage() private Map<Integer, Long> getOffsetsFromMetadataStorage()
{ {
DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource); final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (dataSourceMetadata != null && dataSourceMetadata instanceof KafkaDataSourceMetadata) { if (dataSourceMetadata instanceof KafkaDataSourceMetadata) {
KafkaPartitions partitions = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions(); KafkaPartitions partitions = ((KafkaDataSourceMetadata) dataSourceMetadata).getKafkaPartitions();
if (partitions != null) { if (partitions != null) {
if (!ioConfig.getTopic().equals(partitions.getTopic())) { if (!ioConfig.getTopic().equals(partitions.getTopic())) {
@ -1887,14 +1894,14 @@ public class KafkaSupervisor implements Supervisor
partitions.getTopic(), partitions.getTopic(),
ioConfig.getTopic() ioConfig.getTopic()
); );
return ImmutableMap.of(); return Collections.emptyMap();
} else if (partitions.getPartitionOffsetMap() != null) { } else if (partitions.getPartitionOffsetMap() != null) {
return partitions.getPartitionOffsetMap(); return partitions.getPartitionOffsetMap();
} }
} }
} }
return ImmutableMap.of(); return Collections.emptyMap();
} }
private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset) private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOffset)