From 9ee5265fb33b66421dc94034f54307fae7cff096 Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Tue, 21 Dec 2021 19:48:06 +0000 Subject: [PATCH] YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by Andras Gyori (gandras) and Qi Zhu (zhuqi). --- ...riorityUtilizationQueueOrderingPolicy.java | 65 +++++++++++++++---- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index ada665de409..c475be34203 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; @@ -95,24 +96,26 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli /** * Comparator that both looks at priority and utilization */ - private class PriorityQueueComparator implements Comparator { + private class PriorityQueueComparator + implements Comparator { @Override - public int compare(CSQueue q1, CSQueue q2) { + public int compare(PriorityQueueResourcesForSorting q1Sort, + PriorityQueueResourcesForSorting q2Sort) { String p = partitionToLookAt.get(); - int rc = compareQueueAccessToPartition(q1, q2, p); + int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p); if (0 != rc) { return rc; } - float used1 = q1.getQueueCapacities().getUsedCapacity(p); - float used2 = q2.getQueueCapacities().getUsedCapacity(p); + float used1 = q1Sort.usedCapacity; + float used2 = q2Sort.usedCapacity; int p1 = 0; int p2 = 0; if (respectPriority) { - p1 = q1.getPriority().getPriority(); - p2 = q2.getPriority().getPriority(); + p1 = q1Sort.queue.getPriority().getPriority(); + p2 = q2Sort.queue.getPriority().getPriority(); } rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2); @@ -120,8 +123,8 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli // For queue with same used ratio / priority, queue with higher configured // capacity goes first if (0 == rc) { - float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p); - float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p); + float abs1 = q1Sort.absoluteCapacity; + float abs2 = q2Sort.absoluteCapacity; return Float.compare(abs2, abs1); } @@ -156,6 +159,29 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli } } + /** + * A simple storage class to represent a snapshot of a queue. + */ + public static class PriorityQueueResourcesForSorting { + private final float usedCapacity; + private final float absoluteCapacity; + private final CSQueue queue; + + PriorityQueueResourcesForSorting(CSQueue queue) { + this.queue = queue; + this.usedCapacity = + queue.getQueueCapacities(). + getUsedCapacity(partitionToLookAt.get()); + this.absoluteCapacity = + queue.getQueueCapacities(). + getAbsoluteCapacity(partitionToLookAt.get()); + } + + public CSQueue getQueue() { + return queue; + } + } + public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) { this.respectPriority = respectPriority; } @@ -167,12 +193,23 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli @Override public Iterator getAssignmentIterator(String partition) { - // Since partitionToLookAt is a thread local variable, and every time we - // copy and sort queues, so it's safe for multi-threading environment. + // partitionToLookAt is a thread local variable, therefore it is safe to mutate it. PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); - List sortedQueue = new ArrayList<>(queues); - Collections.sort(sortedQueue, new PriorityQueueComparator()); - return sortedQueue.iterator(); + + // Sort the snapshot of the queues in order to avoid breaking the prerequisites of TimSort. + // See YARN-10178 for details. + List queueSnapshots = new ArrayList<>(); + for (CSQueue queue : queues) { + queueSnapshots.add(new PriorityQueueResourcesForSorting(queue)); + } + Collections.sort(queueSnapshots, new PriorityQueueComparator()); + + List sortedQueues = new ArrayList<>(); + for (PriorityQueueResourcesForSorting queueSnapshot : queueSnapshots) { + sortedQueues.add(queueSnapshot.queue); + } + + return sortedQueues.iterator(); } @Override