From 00ea8c00acf61a9c189ac01ed20febfac9974925 Mon Sep 17 00:00:00 2001 From: QiuMM Date: Thu, 27 Sep 2018 10:01:36 +0800 Subject: [PATCH] using Entry directly instead of Map.Entry in KafkaSupervisor (#6291) --- .../kafka/supervisor/KafkaSupervisor.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 07e98f36d33..67900f61e23 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -783,7 +783,7 @@ public class KafkaSupervisor implements Supervisor // defend against consecutive reset requests from replicas // as well as the case where the metadata store do not have an entry for the reset partitions boolean doReset = false; - for (Map.Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() + for (Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() .getPartitionOffsetMap() .entrySet()) { final Long partitionOffsetInMetadataStore = currentMetadata == null @@ -866,7 +866,7 @@ public class KafkaSupervisor implements Supervisor // checkTaskDuration() will be triggered. This is better than just telling these tasks to publish whatever they // have, as replicas that are supposed to publish the same segment may not have read the same set of offsets. for (TaskGroup taskGroup : taskGroups.values()) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) { killTask(entry.getKey()); } else { @@ -914,7 +914,7 @@ public class KafkaSupervisor implements Supervisor { StringBuilder sb = new StringBuilder(); - for (Map.Entry entry : startPartitions.entrySet()) { + for (Entry entry : startPartitions.entrySet()) { sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue())); } String partitionOffsetStr = sb.toString().substring(1); @@ -1073,7 +1073,7 @@ public class KafkaSupervisor implements Supervisor // existing) so that the next tasks will start reading from where this task left off Map publishingTaskEndOffsets = taskClient.getEndOffsets(taskId); - for (Map.Entry entry : publishingTaskEndOffsets.entrySet()) { + for (Entry entry : publishingTaskEndOffsets.entrySet()) { Integer partition = entry.getKey(); Long offset = entry.getValue(); ConcurrentHashMap partitionOffsets = partitionGroups.get( @@ -1380,7 +1380,7 @@ public class KafkaSupervisor implements Supervisor // update status (and startTime if unknown) of current tasks in taskGroups for (TaskGroup group : taskGroups.values()) { - for (Map.Entry entry : group.tasks.entrySet()) { + for (Entry entry : group.tasks.entrySet()) { final String taskId = entry.getKey(); final TaskData taskData = entry.getValue(); @@ -1423,7 +1423,7 @@ public class KafkaSupervisor implements Supervisor // update status of pending completion tasks in pendingCompletionTaskGroups for (List taskGroups : pendingCompletionTaskGroups.values()) { for (TaskGroup group : taskGroups) { - for (Map.Entry entry : group.tasks.entrySet()) { + for (Entry entry : group.tasks.entrySet()) { entry.getValue().status = taskStorage.getStatus(entry.getKey()).get(); } } @@ -1446,7 +1446,7 @@ public class KafkaSupervisor implements Supervisor final List>> futures = Lists.newArrayList(); final List futureGroupIds = Lists.newArrayList(); - for (Map.Entry entry : taskGroups.entrySet()) { + for (Entry entry : taskGroups.entrySet()) { Integer groupId = entry.getKey(); TaskGroup group = entry.getValue(); @@ -1479,7 +1479,7 @@ public class KafkaSupervisor implements Supervisor pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); // set endOffsets as the next startOffsets - for (Map.Entry entry : endOffsets.entrySet()) { + for (Entry entry : endOffsets.entrySet()) { partitionGroups.get(groupId).put(entry.getKey(), entry.getValue()); } } else { @@ -1505,9 +1505,9 @@ public class KafkaSupervisor implements Supervisor { if (finalize) { // 1) Check if any task completed (in which case we're done) and kill unassigned tasks - Iterator> i = taskGroup.tasks.entrySet().iterator(); + Iterator> i = taskGroup.tasks.entrySet().iterator(); while (i.hasNext()) { - Map.Entry taskEntry = i.next(); + Entry taskEntry = i.next(); String taskId = taskEntry.getKey(); TaskData task = taskEntry.getValue(); @@ -1569,7 +1569,7 @@ public class KafkaSupervisor implements Supervisor taskGroup.tasks.remove(taskId); } else { // otherwise build a map of the highest offsets seen - for (Map.Entry offset : result.entrySet()) { + for (Entry offset : result.entrySet()) { if (!endOffsets.containsKey(offset.getKey()) || endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) { endOffsets.put(offset.getKey(), offset.getValue()); @@ -1647,7 +1647,7 @@ public class KafkaSupervisor implements Supervisor { List> futures = Lists.newArrayList(); - for (Map.Entry> pendingGroupList : pendingCompletionTaskGroups.entrySet()) { + for (Entry> pendingGroupList : pendingCompletionTaskGroups.entrySet()) { boolean stopTasksInTaskGroup = false; Integer groupId = pendingGroupList.getKey(); @@ -1728,9 +1728,9 @@ public class KafkaSupervisor implements Supervisor private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException { List> futures = Lists.newArrayList(); - Iterator> iTaskGroups = taskGroups.entrySet().iterator(); + Iterator> iTaskGroups = taskGroups.entrySet().iterator(); while (iTaskGroups.hasNext()) { - Map.Entry taskGroupEntry = iTaskGroups.next(); + Entry taskGroupEntry = iTaskGroups.next(); Integer groupId = taskGroupEntry.getKey(); TaskGroup taskGroup = taskGroupEntry.getValue(); @@ -1742,9 +1742,9 @@ public class KafkaSupervisor implements Supervisor log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds()); - Iterator> iTasks = taskGroup.tasks.entrySet().iterator(); + Iterator> iTasks = taskGroup.tasks.entrySet().iterator(); while (iTasks.hasNext()) { - Map.Entry task = iTasks.next(); + Entry task = iTasks.next(); String taskId = task.getKey(); TaskData taskData = task.getValue(); @@ -1817,7 +1817,7 @@ public class KafkaSupervisor implements Supervisor // iterate through all the current task groups and make sure each one has the desired number of replica tasks boolean createdTask = false; - for (Map.Entry entry : taskGroups.entrySet()) { + for (Entry entry : taskGroups.entrySet()) { TaskGroup taskGroup = entry.getValue(); Integer groupId = entry.getKey(); @@ -1910,7 +1910,7 @@ public class KafkaSupervisor implements Supervisor private ImmutableMap generateStartingOffsetsForPartitionGroup(int groupId) { ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Map.Entry entry : partitionGroups.get(groupId).entrySet()) { + for (Entry entry : partitionGroups.get(groupId).entrySet()) { Integer partition = entry.getKey(); Long offset = entry.getValue(); @@ -2035,7 +2035,7 @@ public class KafkaSupervisor implements Supervisor } final List> futures = Lists.newArrayList(); - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { final String taskId = entry.getKey(); final TaskData taskData = entry.getValue(); if (taskData.status == null) { @@ -2121,7 +2121,7 @@ public class KafkaSupervisor implements Supervisor try { for (TaskGroup taskGroup : taskGroups.values()) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); @Nullable DateTime startTime = entry.getValue().startTime; @@ -2149,7 +2149,7 @@ public class KafkaSupervisor implements Supervisor for (List taskGroups : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroups) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); @Nullable DateTime startTime = entry.getValue().startTime; @@ -2218,7 +2218,7 @@ public class KafkaSupervisor implements Supervisor .stream() .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) .flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max)); + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max)); } private Map getLagPerPartition(Map currentOffsets) @@ -2228,7 +2228,7 @@ public class KafkaSupervisor implements Supervisor .stream() .collect( Collectors.toMap( - Map.Entry::getKey, + Entry::getKey, e -> latestOffsetsFromKafka != null && latestOffsetsFromKafka.get(e.getKey()) != null && e.getValue() != null