Prevent KafkaSupervisor NPE in generateSequenceName (#5900) (#5902)

* Prevent KafkaSupervisor NPE in checkPendingCompletionTasks (#5900)

* throw IAE in generateSequenceName if groupId not found in taskGroups
* add null check in checkPendingCompletionTasks

* Add warn log in checkPendingCompletionTasks

* Address PR comments

Replace warn with error log

* Address PR comments

* change signature of generateSequenceName to take a TaskGroup object instead of int

* Address comments

* Remove unnecessary method from KafkaSupervisorTest
This commit is contained in:
Surekha 2018-07-04 23:45:42 -07:00 committed by Gian Merlino
parent 4cd14e8158
commit 9bece8ce1e
2 changed files with 35 additions and 37 deletions

View File

@ -143,7 +143,8 @@ public class KafkaSupervisor implements Supervisor
* time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
* map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
*/
private static class TaskGroup
@VisibleForTesting
static class TaskGroup
{
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@ -777,8 +778,8 @@ public class KafkaSupervisor implements Supervisor
resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(ImmutableSet.of(partition));
sequenceTaskGroup.remove(generateSequenceName(groupId));
taskGroups.remove(groupId);
final TaskGroup removedGroup = taskGroups.remove(groupId);
sequenceTaskGroup.remove(generateSequenceName(removedGroup));
partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
});
} else {
@ -886,12 +887,13 @@ public class KafkaSupervisor implements Supervisor
}
@VisibleForTesting
String generateSequenceName(int groupId)
String generateSequenceName(TaskGroup taskGroup)
{
Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
return generateSequenceName(
taskGroups.get(groupId).partitionOffsets,
taskGroups.get(groupId).minimumMessageTime,
taskGroups.get(groupId).maximumMessageTime
taskGroup.partitionOffsets,
taskGroup.minimumMessageTime,
taskGroup.maximumMessageTime
);
}
@ -1085,18 +1087,19 @@ public class KafkaSupervisor implements Supervisor
}
return false;
} else {
final TaskGroup taskGroup = new TaskGroup(
ImmutableMap.copyOf(
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
), kafkaTask.getIOConfig().getMinimumMessageTime(),
kafkaTask.getIOConfig().getMaximumMessageTime()
);
if (taskGroups.putIfAbsent(
taskGroupId,
new TaskGroup(
ImmutableMap.copyOf(
kafkaTask.getIOConfig()
.getStartPartitions()
.getPartitionOffsetMap()
), kafkaTask.getIOConfig().getMinimumMessageTime(),
kafkaTask.getIOConfig().getMaximumMessageTime()
)
taskGroup
) == null) {
sequenceTaskGroup.put(generateSequenceName(taskGroupId), taskGroups.get(taskGroupId));
sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId));
log.info("Created new task group [%d]", taskGroupId);
}
taskGroupsToVerify.add(taskGroupId);
@ -1253,7 +1256,7 @@ public class KafkaSupervisor implements Supervisor
// killing all tasks or no task left in the group ?
// clear state about the taskgroup so that get latest offset information is fetched from metadata store
log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId);
sequenceTaskGroup.remove(generateSequenceName(groupId));
sequenceTaskGroup.remove(generateSequenceName(taskGroup));
taskGroups.remove(groupId);
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
@ -1429,7 +1432,7 @@ public class KafkaSupervisor implements Supervisor
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
}
sequenceTaskGroup.remove(generateSequenceName(groupId));
sequenceTaskGroup.remove(generateSequenceName(group));
// remove this task group from the list of current task groups now that it has been handled
taskGroups.remove(groupId);
}
@ -1630,8 +1633,7 @@ public class KafkaSupervisor implements Supervisor
// reset partitions offsets for this task group so that they will be re-read from metadata storage
partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
sequenceTaskGroup.remove(generateSequenceName(groupId));
sequenceTaskGroup.remove(generateSequenceName(group));
// kill all the tasks in this pending completion group
killTasksInGroup(group);
// set a flag so the other pending completion groups for this set of partitions will also stop
@ -1691,7 +1693,7 @@ public class KafkaSupervisor implements Supervisor
// be recreated with the next set of offsets
if (taskData.status.isSuccess()) {
futures.add(stopTasksInGroup(taskGroup));
sequenceTaskGroup.remove(generateSequenceName(groupId));
sequenceTaskGroup.remove(generateSequenceName(taskGroup));
iTaskGroups.remove();
break;
}
@ -1724,15 +1726,16 @@ public class KafkaSupervisor implements Supervisor
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
) : Optional.<DateTime>absent());
final TaskGroup taskGroup = new TaskGroup(
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
);
taskGroups.put(
groupId,
new TaskGroup(
generateStartingOffsetsForPartitionGroup(groupId),
minimumMessageTime,
maximumMessageTime
)
taskGroup
);
sequenceTaskGroup.put(generateSequenceName(groupId), taskGroups.get(groupId));
sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId));
}
}
@ -1767,8 +1770,8 @@ public class KafkaSupervisor implements Supervisor
for (Integer partition : startPartitions.keySet()) {
endPartitions.put(partition, Long.MAX_VALUE);
}
String sequenceName = generateSequenceName(groupId);
TaskGroup group = taskGroups.get(groupId);
String sequenceName = generateSequenceName(group);
Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();
@ -1929,7 +1932,7 @@ public class KafkaSupervisor implements Supervisor
String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName();
if (taskGroups.get(taskGroupId) != null) {
return generateSequenceName(taskGroupId).equals(taskSequenceName);
return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
} else {
return generateSequenceName(
((KafkaIndexTask) taskOptional.get()).getIOConfig()

View File

@ -2192,12 +2192,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
}
@Override
protected String generateSequenceName(int groupId)
{
return StringUtils.format("sequenceName-%d", groupId);
}
@Override
protected String generateSequenceName(
Map<Integer, Long> startPartitions,
@ -2205,7 +2199,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Optional<DateTime> maximumMessageTime
)
{
return generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()));
final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
return StringUtils.format("sequenceName-%d", groupId);
}
}
}