diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index d8492830a9b..39b19ee7371 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -143,7 +143,8 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - private static class TaskGroup + @VisibleForTesting + static class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -777,8 +778,8 @@ public class KafkaSupervisor implements Supervisor resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> { final int groupId = getTaskGroupIdForPartition(partition); killTaskGroupForPartitions(ImmutableSet.of(partition)); - sequenceTaskGroup.remove(generateSequenceName(groupId)); - taskGroups.remove(groupId); + final TaskGroup removedGroup = taskGroups.remove(groupId); + sequenceTaskGroup.remove(generateSequenceName(removedGroup)); partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET); }); } else { @@ -886,12 +887,13 @@ public class KafkaSupervisor implements Supervisor } @VisibleForTesting - String generateSequenceName(int groupId) + String generateSequenceName(TaskGroup taskGroup) { + Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null"); return generateSequenceName( - taskGroups.get(groupId).partitionOffsets, - taskGroups.get(groupId).minimumMessageTime, - taskGroups.get(groupId).maximumMessageTime + taskGroup.partitionOffsets, + taskGroup.minimumMessageTime, + taskGroup.maximumMessageTime ); } @@ -1085,18 +1087,19 @@ public class KafkaSupervisor implements Supervisor } return false; } else { + final TaskGroup taskGroup = new TaskGroup( + ImmutableMap.copyOf( + kafkaTask.getIOConfig() + .getStartPartitions() + .getPartitionOffsetMap() + ), kafkaTask.getIOConfig().getMinimumMessageTime(), + kafkaTask.getIOConfig().getMaximumMessageTime() + ); if (taskGroups.putIfAbsent( taskGroupId, - new TaskGroup( - ImmutableMap.copyOf( - kafkaTask.getIOConfig() - .getStartPartitions() - .getPartitionOffsetMap() - ), kafkaTask.getIOConfig().getMinimumMessageTime(), - kafkaTask.getIOConfig().getMaximumMessageTime() - ) + taskGroup ) == null) { - sequenceTaskGroup.put(generateSequenceName(taskGroupId), taskGroups.get(taskGroupId)); + sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId)); log.info("Created new task group [%d]", taskGroupId); } taskGroupsToVerify.add(taskGroupId); @@ -1253,7 +1256,7 @@ public class KafkaSupervisor implements Supervisor // killing all tasks or no task left in the group ? // clear state about the taskgroup so that get latest offset information is fetched from metadata store log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId); - sequenceTaskGroup.remove(generateSequenceName(groupId)); + sequenceTaskGroup.remove(generateSequenceName(taskGroup)); taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } @@ -1429,7 +1432,7 @@ public class KafkaSupervisor implements Supervisor partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } - sequenceTaskGroup.remove(generateSequenceName(groupId)); + sequenceTaskGroup.remove(generateSequenceName(group)); // remove this task group from the list of current task groups now that it has been handled taskGroups.remove(groupId); } @@ -1630,8 +1633,7 @@ public class KafkaSupervisor implements Supervisor // reset partitions offsets for this task group so that they will be re-read from metadata storage partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); - sequenceTaskGroup.remove(generateSequenceName(groupId)); - + sequenceTaskGroup.remove(generateSequenceName(group)); // kill all the tasks in this pending completion group killTasksInGroup(group); // set a flag so the other pending completion groups for this set of partitions will also stop @@ -1691,7 +1693,7 @@ public class KafkaSupervisor implements Supervisor // be recreated with the next set of offsets if (taskData.status.isSuccess()) { futures.add(stopTasksInGroup(taskGroup)); - sequenceTaskGroup.remove(generateSequenceName(groupId)); + sequenceTaskGroup.remove(generateSequenceName(taskGroup)); iTaskGroups.remove(); break; } @@ -1724,15 +1726,16 @@ public class KafkaSupervisor implements Supervisor DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get()) ) : Optional.absent()); + final TaskGroup taskGroup = new TaskGroup( + generateStartingOffsetsForPartitionGroup(groupId), + minimumMessageTime, + maximumMessageTime + ); taskGroups.put( groupId, - new TaskGroup( - generateStartingOffsetsForPartitionGroup(groupId), - minimumMessageTime, - maximumMessageTime - ) + taskGroup ); - sequenceTaskGroup.put(generateSequenceName(groupId), taskGroups.get(groupId)); + sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId)); } } @@ -1767,8 +1770,8 @@ public class KafkaSupervisor implements Supervisor for (Integer partition : startPartitions.keySet()) { endPartitions.put(partition, Long.MAX_VALUE); } - - String sequenceName = generateSequenceName(groupId); + TaskGroup group = taskGroups.get(groupId); + String sequenceName = generateSequenceName(group); Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); @@ -1929,7 +1932,7 @@ public class KafkaSupervisor implements Supervisor String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName(); if (taskGroups.get(taskGroupId) != null) { - return generateSequenceName(taskGroupId).equals(taskSequenceName); + return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName); } else { return generateSequenceName( ((KafkaIndexTask) taskOptional.get()).getIOConfig() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 05b7d72b6a3..1d951de9d0b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2192,12 +2192,6 @@ public class KafkaSupervisorTest extends EasyMockSupport ); } - @Override - protected String generateSequenceName(int groupId) - { - return StringUtils.format("sequenceName-%d", groupId); - } - @Override protected String generateSequenceName( Map startPartitions, @@ -2205,7 +2199,8 @@ public class KafkaSupervisorTest extends EasyMockSupport Optional maximumMessageTime ) { - return generateSequenceName(getTaskGroupIdForPartition(startPartitions.keySet().iterator().next())); + final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next()); + return StringUtils.format("sequenceName-%d", groupId); } } }