mirror of https://github.com/apache/druid.git
using Entry directly instead of Map.Entry in KafkaSupervisor (#6291)
This commit is contained in:
parent
6fb503c073
commit
00ea8c00ac
|
@ -783,7 +783,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
// defend against consecutive reset requests from replicas
|
// defend against consecutive reset requests from replicas
|
||||||
// as well as the case where the metadata store do not have an entry for the reset partitions
|
// as well as the case where the metadata store do not have an entry for the reset partitions
|
||||||
boolean doReset = false;
|
boolean doReset = false;
|
||||||
for (Map.Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
|
for (Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
|
||||||
.getPartitionOffsetMap()
|
.getPartitionOffsetMap()
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
final Long partitionOffsetInMetadataStore = currentMetadata == null
|
final Long partitionOffsetInMetadataStore = currentMetadata == null
|
||||||
|
@ -866,7 +866,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
// checkTaskDuration() will be triggered. This is better than just telling these tasks to publish whatever they
|
// checkTaskDuration() will be triggered. This is better than just telling these tasks to publish whatever they
|
||||||
// have, as replicas that are supposed to publish the same segment may not have read the same set of offsets.
|
// have, as replicas that are supposed to publish the same segment may not have read the same set of offsets.
|
||||||
for (TaskGroup taskGroup : taskGroups.values()) {
|
for (TaskGroup taskGroup : taskGroups.values()) {
|
||||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||||
if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
|
if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
|
||||||
killTask(entry.getKey());
|
killTask(entry.getKey());
|
||||||
} else {
|
} else {
|
||||||
|
@ -914,7 +914,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
{
|
{
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
|
for (Entry<Integer, Long> entry : startPartitions.entrySet()) {
|
||||||
sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue()));
|
sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue()));
|
||||||
}
|
}
|
||||||
String partitionOffsetStr = sb.toString().substring(1);
|
String partitionOffsetStr = sb.toString().substring(1);
|
||||||
|
@ -1073,7 +1073,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
// existing) so that the next tasks will start reading from where this task left off
|
// existing) so that the next tasks will start reading from where this task left off
|
||||||
Map<Integer, Long> publishingTaskEndOffsets = taskClient.getEndOffsets(taskId);
|
Map<Integer, Long> publishingTaskEndOffsets = taskClient.getEndOffsets(taskId);
|
||||||
|
|
||||||
for (Map.Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
|
for (Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
|
||||||
Integer partition = entry.getKey();
|
Integer partition = entry.getKey();
|
||||||
Long offset = entry.getValue();
|
Long offset = entry.getValue();
|
||||||
ConcurrentHashMap<Integer, Long> partitionOffsets = partitionGroups.get(
|
ConcurrentHashMap<Integer, Long> partitionOffsets = partitionGroups.get(
|
||||||
|
@ -1380,7 +1380,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
|
|
||||||
// update status (and startTime if unknown) of current tasks in taskGroups
|
// update status (and startTime if unknown) of current tasks in taskGroups
|
||||||
for (TaskGroup group : taskGroups.values()) {
|
for (TaskGroup group : taskGroups.values()) {
|
||||||
for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
||||||
final String taskId = entry.getKey();
|
final String taskId = entry.getKey();
|
||||||
final TaskData taskData = entry.getValue();
|
final TaskData taskData = entry.getValue();
|
||||||
|
|
||||||
|
@ -1423,7 +1423,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
// update status of pending completion tasks in pendingCompletionTaskGroups
|
// update status of pending completion tasks in pendingCompletionTaskGroups
|
||||||
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
|
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
|
||||||
for (TaskGroup group : taskGroups) {
|
for (TaskGroup group : taskGroups) {
|
||||||
for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
|
||||||
entry.getValue().status = taskStorage.getStatus(entry.getKey()).get();
|
entry.getValue().status = taskStorage.getStatus(entry.getKey()).get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1446,7 +1446,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
final List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
|
final List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
|
||||||
final List<Integer> futureGroupIds = Lists.newArrayList();
|
final List<Integer> futureGroupIds = Lists.newArrayList();
|
||||||
|
|
||||||
for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
||||||
Integer groupId = entry.getKey();
|
Integer groupId = entry.getKey();
|
||||||
TaskGroup group = entry.getValue();
|
TaskGroup group = entry.getValue();
|
||||||
|
|
||||||
|
@ -1479,7 +1479,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
|
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
|
||||||
|
|
||||||
// set endOffsets as the next startOffsets
|
// set endOffsets as the next startOffsets
|
||||||
for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
|
for (Entry<Integer, Long> entry : endOffsets.entrySet()) {
|
||||||
partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
|
partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1505,9 +1505,9 @@ public class KafkaSupervisor implements Supervisor
|
||||||
{
|
{
|
||||||
if (finalize) {
|
if (finalize) {
|
||||||
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
|
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
|
||||||
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
|
Iterator<Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
|
||||||
while (i.hasNext()) {
|
while (i.hasNext()) {
|
||||||
Map.Entry<String, TaskData> taskEntry = i.next();
|
Entry<String, TaskData> taskEntry = i.next();
|
||||||
String taskId = taskEntry.getKey();
|
String taskId = taskEntry.getKey();
|
||||||
TaskData task = taskEntry.getValue();
|
TaskData task = taskEntry.getValue();
|
||||||
|
|
||||||
|
@ -1569,7 +1569,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
taskGroup.tasks.remove(taskId);
|
taskGroup.tasks.remove(taskId);
|
||||||
|
|
||||||
} else { // otherwise build a map of the highest offsets seen
|
} else { // otherwise build a map of the highest offsets seen
|
||||||
for (Map.Entry<Integer, Long> offset : result.entrySet()) {
|
for (Entry<Integer, Long> offset : result.entrySet()) {
|
||||||
if (!endOffsets.containsKey(offset.getKey())
|
if (!endOffsets.containsKey(offset.getKey())
|
||||||
|| endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) {
|
|| endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) {
|
||||||
endOffsets.put(offset.getKey(), offset.getValue());
|
endOffsets.put(offset.getKey(), offset.getValue());
|
||||||
|
@ -1647,7 +1647,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
{
|
{
|
||||||
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
||||||
|
|
||||||
for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {
|
for (Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {
|
||||||
|
|
||||||
boolean stopTasksInTaskGroup = false;
|
boolean stopTasksInTaskGroup = false;
|
||||||
Integer groupId = pendingGroupList.getKey();
|
Integer groupId = pendingGroupList.getKey();
|
||||||
|
@ -1728,9 +1728,9 @@ public class KafkaSupervisor implements Supervisor
|
||||||
private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException
|
private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException
|
||||||
{
|
{
|
||||||
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
||||||
Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
|
Iterator<Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
|
||||||
while (iTaskGroups.hasNext()) {
|
while (iTaskGroups.hasNext()) {
|
||||||
Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
|
Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
|
||||||
Integer groupId = taskGroupEntry.getKey();
|
Integer groupId = taskGroupEntry.getKey();
|
||||||
TaskGroup taskGroup = taskGroupEntry.getValue();
|
TaskGroup taskGroup = taskGroupEntry.getValue();
|
||||||
|
|
||||||
|
@ -1742,9 +1742,9 @@ public class KafkaSupervisor implements Supervisor
|
||||||
|
|
||||||
log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds());
|
log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds());
|
||||||
|
|
||||||
Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
|
Iterator<Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
|
||||||
while (iTasks.hasNext()) {
|
while (iTasks.hasNext()) {
|
||||||
Map.Entry<String, TaskData> task = iTasks.next();
|
Entry<String, TaskData> task = iTasks.next();
|
||||||
String taskId = task.getKey();
|
String taskId = task.getKey();
|
||||||
TaskData taskData = task.getValue();
|
TaskData taskData = task.getValue();
|
||||||
|
|
||||||
|
@ -1817,7 +1817,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
|
|
||||||
// iterate through all the current task groups and make sure each one has the desired number of replica tasks
|
// iterate through all the current task groups and make sure each one has the desired number of replica tasks
|
||||||
boolean createdTask = false;
|
boolean createdTask = false;
|
||||||
for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
|
||||||
TaskGroup taskGroup = entry.getValue();
|
TaskGroup taskGroup = entry.getValue();
|
||||||
Integer groupId = entry.getKey();
|
Integer groupId = entry.getKey();
|
||||||
|
|
||||||
|
@ -1910,7 +1910,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int groupId)
|
private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int groupId)
|
||||||
{
|
{
|
||||||
ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
|
ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
|
||||||
for (Map.Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) {
|
for (Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) {
|
||||||
Integer partition = entry.getKey();
|
Integer partition = entry.getKey();
|
||||||
Long offset = entry.getValue();
|
Long offset = entry.getValue();
|
||||||
|
|
||||||
|
@ -2035,7 +2035,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
|
final List<ListenableFuture<Void>> futures = Lists.newArrayList();
|
||||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||||
final String taskId = entry.getKey();
|
final String taskId = entry.getKey();
|
||||||
final TaskData taskData = entry.getValue();
|
final TaskData taskData = entry.getValue();
|
||||||
if (taskData.status == null) {
|
if (taskData.status == null) {
|
||||||
|
@ -2121,7 +2121,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (TaskGroup taskGroup : taskGroups.values()) {
|
for (TaskGroup taskGroup : taskGroups.values()) {
|
||||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||||
String taskId = entry.getKey();
|
String taskId = entry.getKey();
|
||||||
@Nullable
|
@Nullable
|
||||||
DateTime startTime = entry.getValue().startTime;
|
DateTime startTime = entry.getValue().startTime;
|
||||||
|
@ -2149,7 +2149,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
|
|
||||||
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
|
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
|
||||||
for (TaskGroup taskGroup : taskGroups) {
|
for (TaskGroup taskGroup : taskGroups) {
|
||||||
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
|
||||||
String taskId = entry.getKey();
|
String taskId = entry.getKey();
|
||||||
@Nullable
|
@Nullable
|
||||||
DateTime startTime = entry.getValue().startTime;
|
DateTime startTime = entry.getValue().startTime;
|
||||||
|
@ -2218,7 +2218,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
|
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
|
||||||
.flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream())
|
.flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream())
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max));
|
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
|
private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
|
||||||
|
@ -2228,7 +2228,7 @@ public class KafkaSupervisor implements Supervisor
|
||||||
.stream()
|
.stream()
|
||||||
.collect(
|
.collect(
|
||||||
Collectors.toMap(
|
Collectors.toMap(
|
||||||
Map.Entry::getKey,
|
Entry::getKey,
|
||||||
e -> latestOffsetsFromKafka != null
|
e -> latestOffsetsFromKafka != null
|
||||||
&& latestOffsetsFromKafka.get(e.getKey()) != null
|
&& latestOffsetsFromKafka.get(e.getKey()) != null
|
||||||
&& e.getValue() != null
|
&& e.getValue() != null
|
||||||
|
|
Loading…
Reference in New Issue