mirror of https://github.com/apache/druid.git
Fix NPE in KafkaSupervisor.checkpointTaskGroup (#6206)
* Fix NPE in KafkaSupervisor.checkpointTaskGroup * address comments * address comment
This commit is contained in:
parent
0172326c62
commit
bda5a8a95e
|
@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
*/
|
||||
private class TaskGroup
|
||||
{
|
||||
final int groupId;
|
||||
|
||||
// This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
|
||||
// in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
|
||||
// this task group has completed successfully, at which point this will be destroyed and a new task group will be
|
||||
|
@ -161,11 +163,13 @@ public class KafkaSupervisor implements Supervisor
|
|||
final String baseSequenceName;
|
||||
|
||||
TaskGroup(
|
||||
int groupId,
|
||||
ImmutableMap<Integer, Long> partitionOffsets,
|
||||
Optional<DateTime> minimumMessageTime,
|
||||
Optional<DateTime> maximumMessageTime
|
||||
)
|
||||
{
|
||||
this.groupId = groupId;
|
||||
this.partitionOffsets = partitionOffsets;
|
||||
this.minimumMessageTime = minimumMessageTime;
|
||||
this.maximumMessageTime = maximumMessageTime;
|
||||
|
@ -187,9 +191,21 @@ public class KafkaSupervisor implements Supervisor
|
|||
|
||||
private static class TaskData
|
||||
{
|
||||
@Nullable
|
||||
volatile TaskStatus status;
|
||||
@Nullable
|
||||
volatile DateTime startTime;
|
||||
volatile Map<Integer, Long> currentOffsets = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "TaskData{" +
|
||||
"status=" + status +
|
||||
", startTime=" + startTime +
|
||||
", currentOffsets=" + currentOffsets +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
// Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class
|
||||
|
@ -718,8 +734,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
|
||||
return;
|
||||
}
|
||||
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
|
||||
taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
|
||||
final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
|
||||
taskGroup.addNewCheckpoint(newCheckpoint);
|
||||
log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
|
||||
}
|
||||
}
|
||||
|
@ -785,10 +801,13 @@ public class KafkaSupervisor implements Supervisor
|
|||
: currentMetadata.getKafkaPartitions()
|
||||
.getPartitionOffsetMap()
|
||||
.get(resetPartitionOffset.getKey());
|
||||
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
|
||||
if (partitionOffsetInMetadataStore != null ||
|
||||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
|
||||
.equals(resetPartitionOffset.getValue()))) {
|
||||
final TaskGroup partitionTaskGroup = taskGroups.get(
|
||||
getTaskGroupIdForPartition(resetPartitionOffset.getKey())
|
||||
);
|
||||
final boolean isSameOffset = partitionTaskGroup != null
|
||||
&& partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
|
||||
.equals(resetPartitionOffset.getValue());
|
||||
if (partitionOffsetInMetadataStore != null || isSameOffset) {
|
||||
doReset = true;
|
||||
break;
|
||||
}
|
||||
|
@ -1012,7 +1031,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
List<String> futureTaskIds = Lists.newArrayList();
|
||||
List<ListenableFuture<Boolean>> futures = Lists.newArrayList();
|
||||
List<Task> tasks = taskStorage.getActiveTasks();
|
||||
final Set<Integer> taskGroupsToVerify = new HashSet<>();
|
||||
final Map<Integer, TaskGroup> taskGroupsToVerify = new HashMap<>();
|
||||
|
||||
for (Task task : tasks) {
|
||||
if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) {
|
||||
|
@ -1119,6 +1138,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
k -> {
|
||||
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
|
||||
return new TaskGroup(
|
||||
taskGroupId,
|
||||
ImmutableMap.copyOf(
|
||||
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
|
||||
),
|
||||
|
@ -1127,8 +1147,15 @@ public class KafkaSupervisor implements Supervisor
|
|||
);
|
||||
}
|
||||
);
|
||||
taskGroupsToVerify.add(taskGroupId);
|
||||
taskGroup.tasks.putIfAbsent(taskId, new TaskData());
|
||||
taskGroupsToVerify.put(taskGroupId, taskGroup);
|
||||
final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData());
|
||||
if (prevTaskGroup != null) {
|
||||
throw new ISE(
|
||||
"WTH? a taskGroup[%s] already exists for new task[%s]",
|
||||
prevTaskGroup,
|
||||
taskId
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
@ -1156,7 +1183,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource);
|
||||
|
||||
// make sure the checkpoints are consistent with each other and with the metadata store
|
||||
taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints);
|
||||
taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1166,10 +1193,9 @@ public class KafkaSupervisor implements Supervisor
|
|||
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
|
||||
* created tasks for the taskGroup start indexing from after the latest published offsets.
|
||||
*/
|
||||
private void verifyAndMergeCheckpoints(final Integer groupId)
|
||||
private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
|
||||
{
|
||||
final TaskGroup taskGroup = taskGroups.get(groupId);
|
||||
|
||||
final int groupId = taskGroup.groupId;
|
||||
// List<TaskId, Map -> {SequenceId, Checkpoints}>
|
||||
final List<Pair<String, TreeMap<Integer, Map<Integer, Long>>>> taskSequences = new CopyOnWriteArrayList<>();
|
||||
final List<ListenableFuture<TreeMap<Integer, Map<Integer, Long>>>> futures = new ArrayList<>();
|
||||
|
@ -1330,6 +1356,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
// reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot
|
||||
// change to a state where it will read any more events
|
||||
TaskGroup newTaskGroup = new TaskGroup(
|
||||
groupId,
|
||||
ImmutableMap.copyOf(startingPartitions),
|
||||
Optional.absent(),
|
||||
Optional.absent()
|
||||
|
@ -1367,8 +1394,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
|
||||
taskData.startTime = startTime;
|
||||
long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis()
|
||||
- taskData.startTime.getMillis());
|
||||
long millisRemaining = ioConfig.getTaskDuration().getMillis() -
|
||||
(System.currentTimeMillis() - taskData.startTime.getMillis());
|
||||
if (millisRemaining > 0) {
|
||||
scheduledExec.schedule(
|
||||
buildRunTask(),
|
||||
|
@ -1421,7 +1448,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
// find the longest running task from this group
|
||||
DateTime earliestTaskStart = DateTimes.nowUtc();
|
||||
for (TaskData taskData : group.tasks.values()) {
|
||||
if (earliestTaskStart.isAfter(taskData.startTime)) {
|
||||
// startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice
|
||||
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
|
||||
earliestTaskStart = taskData.startTime;
|
||||
}
|
||||
}
|
||||
|
@ -1430,7 +1458,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
|
||||
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
|
||||
futureGroupIds.add(groupId);
|
||||
futures.add(checkpointTaskGroup(groupId, true));
|
||||
futures.add(checkpointTaskGroup(group, true));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1468,10 +1496,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final int groupId, final boolean finalize)
|
||||
private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize)
|
||||
{
|
||||
final TaskGroup taskGroup = taskGroups.get(groupId);
|
||||
|
||||
if (finalize) {
|
||||
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
|
||||
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
|
||||
|
@ -1480,30 +1506,33 @@ public class KafkaSupervisor implements Supervisor
|
|||
String taskId = taskEntry.getKey();
|
||||
TaskData task = taskEntry.getValue();
|
||||
|
||||
if (task.status.isSuccess()) {
|
||||
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
|
||||
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
|
||||
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
|
||||
// failed and we need to re-ingest)
|
||||
return Futures.transform(
|
||||
stopTasksInGroup(taskGroup),
|
||||
new Function<Object, Map<Integer, Long>>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Map<Integer, Long> apply(@Nullable Object input)
|
||||
// task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice.
|
||||
if (task.status != null) {
|
||||
if (task.status.isSuccess()) {
|
||||
// If any task in this group has already completed, stop the rest of the tasks in the group and return.
|
||||
// This will cause us to create a new set of tasks next cycle that will start from the offsets in
|
||||
// metadata store (which will have advanced if we succeeded in publishing and will remain the same if
|
||||
// publishing failed and we need to re-ingest)
|
||||
return Futures.transform(
|
||||
stopTasksInGroup(taskGroup),
|
||||
new Function<Object, Map<Integer, Long>>()
|
||||
{
|
||||
return null;
|
||||
@Nullable
|
||||
@Override
|
||||
public Map<Integer, Long> apply(@Nullable Object input)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
if (task.status.isRunnable()) {
|
||||
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
|
||||
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
|
||||
killTask(taskId);
|
||||
i.remove();
|
||||
if (task.status.isRunnable()) {
|
||||
if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
|
||||
log.info("Killing task [%s] which hasn't been assigned to a worker", taskId);
|
||||
killTask(taskId);
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1550,7 +1579,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
final List<String> setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds());
|
||||
|
||||
if (setEndOffsetTaskIds.isEmpty()) {
|
||||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
|
||||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1561,11 +1590,15 @@ public class KafkaSupervisor implements Supervisor
|
|||
"Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]",
|
||||
endOffsets,
|
||||
taskGroup.sequenceOffsets.lastEntry().getValue(),
|
||||
groupId
|
||||
taskGroup.groupId
|
||||
);
|
||||
}
|
||||
|
||||
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
|
||||
log.info(
|
||||
"Setting endOffsets for tasks in taskGroup [%d] to %s and resuming",
|
||||
taskGroup.groupId,
|
||||
endOffsets
|
||||
);
|
||||
for (final String taskId : setEndOffsetTaskIds) {
|
||||
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
|
||||
}
|
||||
|
@ -1587,7 +1620,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
|
||||
if (taskGroup.tasks.isEmpty()) {
|
||||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId);
|
||||
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -1627,11 +1660,15 @@ public class KafkaSupervisor implements Supervisor
|
|||
continue;
|
||||
}
|
||||
|
||||
Iterator<Map.Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
|
||||
Iterator<Entry<String, TaskData>> iTask = group.tasks.entrySet().iterator();
|
||||
while (iTask.hasNext()) {
|
||||
Map.Entry<String, TaskData> task = iTask.next();
|
||||
final Entry<String, TaskData> entry = iTask.next();
|
||||
final String taskId = entry.getKey();
|
||||
final TaskData taskData = entry.getValue();
|
||||
|
||||
if (task.getValue().status.isFailure()) {
|
||||
Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
|
||||
|
||||
if (taskData.status.isFailure()) {
|
||||
iTask.remove(); // remove failed task
|
||||
if (group.tasks.isEmpty()) {
|
||||
// if all tasks in the group have failed, just nuke all task groups with this partition set and restart
|
||||
|
@ -1640,10 +1677,10 @@ public class KafkaSupervisor implements Supervisor
|
|||
}
|
||||
}
|
||||
|
||||
if (task.getValue().status.isSuccess()) {
|
||||
if (taskData.status.isSuccess()) {
|
||||
// If one of the pending completion tasks was successful, stop the rest of the tasks in the group as
|
||||
// we no longer need them to publish their segment.
|
||||
log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds());
|
||||
log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds());
|
||||
futures.add(stopTasksInGroup(group));
|
||||
foundSuccess = true;
|
||||
toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups
|
||||
|
@ -1714,6 +1751,8 @@ public class KafkaSupervisor implements Supervisor
|
|||
continue;
|
||||
}
|
||||
|
||||
Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId);
|
||||
|
||||
// remove failed tasks
|
||||
if (taskData.status.isFailure()) {
|
||||
iTasks.remove();
|
||||
|
@ -1741,7 +1780,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
taskGroups.entrySet()
|
||||
.stream()
|
||||
.filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas())
|
||||
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey()));
|
||||
.forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue()));
|
||||
|
||||
// check that there is a current task group for each group of partitions in [partitionGroups]
|
||||
for (Integer groupId : partitionGroups.keySet()) {
|
||||
|
@ -1757,6 +1796,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
) : Optional.absent());
|
||||
|
||||
final TaskGroup taskGroup = new TaskGroup(
|
||||
groupId,
|
||||
generateStartingOffsetsForPartitionGroup(groupId),
|
||||
minimumMessageTime,
|
||||
maximumMessageTime
|
||||
|
@ -1984,8 +2024,12 @@ public class KafkaSupervisor implements Supervisor
|
|||
|
||||
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
if (!entry.getValue().status.isComplete()) {
|
||||
futures.add(stopTask(entry.getKey(), false));
|
||||
final String taskId = entry.getKey();
|
||||
final TaskData taskData = entry.getValue();
|
||||
if (taskData.status == null) {
|
||||
killTask(taskId);
|
||||
} else if (!taskData.status.isComplete()) {
|
||||
futures.add(stopTask(taskId, false));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2066,7 +2110,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
for (TaskGroup taskGroup : taskGroups.values()) {
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
DateTime startTime = entry.getValue().startTime;
|
||||
@Nullable DateTime startTime = entry.getValue().startTime;
|
||||
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
|
||||
Long remainingSeconds = null;
|
||||
if (startTime != null) {
|
||||
|
@ -2093,7 +2137,7 @@ public class KafkaSupervisor implements Supervisor
|
|||
for (TaskGroup taskGroup : taskGroups) {
|
||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||
String taskId = entry.getKey();
|
||||
DateTime startTime = entry.getValue().startTime;
|
||||
@Nullable DateTime startTime = entry.getValue().startTime;
|
||||
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
|
||||
Long remainingSeconds = null;
|
||||
if (taskGroup.completionTimeout != null) {
|
||||
|
|
|
@ -45,7 +45,7 @@ public class TaskReportData
|
|||
String id,
|
||||
@Nullable Map<Integer, Long> startingOffsets,
|
||||
@Nullable Map<Integer, Long> currentOffsets,
|
||||
DateTime startTime,
|
||||
@Nullable DateTime startTime,
|
||||
Long remainingSeconds,
|
||||
TaskType type,
|
||||
@Nullable Map<Integer, Long> lag
|
||||
|
|
Loading…
Reference in New Issue