From ccaba2561a7b8aa06f33342d504e38a095932b00 Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Tue, 21 Dec 2021 19:05:39 +0000 Subject: [PATCH] YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by Andras Gyori (gandras) and Qi Zhu (zhuqi). (cherry picked from commit e2d6fd075dff4e6ea290ec638f0a3f6688e76335) --- ...riorityUtilizationQueueOrderingPolicy.java | 91 +++++++++++++------ 1 file changed, 65 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index d3e2f8981a8..995c2ea6b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -28,12 +28,11 @@ .CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * For two queues with the same priority: @@ -101,19 +100,21 @@ public static int compare(double relativeAssigned1, double relativeAssigned2, /** * Comparator that both looks at priority and utilization */ - private class PriorityQueueComparator implements Comparator { + private class PriorityQueueComparator + implements Comparator { @Override - public int compare(CSQueue q1, CSQueue q2) { + public int compare(PriorityQueueResourcesForSorting q1Sort, + PriorityQueueResourcesForSorting q2Sort) { String p = partitionToLookAt.get(); - int rc = compareQueueAccessToPartition(q1, q2, p); + int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p); if (0 != rc) { return rc; } - float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p); - float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p); + float q1AbsCapacity = q1Sort.absoluteCapacity; + float q2AbsCapacity = q2Sort.absoluteCapacity; //If q1's abs capacity > 0 and q2 is 0, then prioritize q1 if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity, @@ -127,28 +128,33 @@ public int compare(CSQueue q1, CSQueue q2) { q2AbsCapacity, 0f) == 0) { // both q1 has 0 and q2 has 0 capacity, then fall back to using // priority, abs used capacity to prioritize - float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p); - float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p); + float used1 = q1Sort.absoluteUsedCapacity; + float used2 = q2Sort.absoluteUsedCapacity; - return compare(q1, q2, used1, used2, p); + return compare(q1Sort, q2Sort, used1, used2, + q1Sort.queue.getPriority(). + getPriority(), q2Sort.queue.getPriority().getPriority()); } else{ // both q1 has positive abs capacity and q2 has positive abs // capacity - float used1 = q1.getQueueCapacities().getUsedCapacity(p); - float used2 = q2.getQueueCapacities().getUsedCapacity(p); + float used1 = q1Sort.usedCapacity; + float used2 = q2Sort.usedCapacity; - return compare(q1, q2, used1, used2, p); + return compare(q1Sort, q2Sort, used1, used2, + q1Sort.queue.getPriority().getPriority(), + q2Sort.queue.getPriority().getPriority()); } } - private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, - String partition) { + private int compare(PriorityQueueResourcesForSorting q1Sort, + PriorityQueueResourcesForSorting q2Sort, float q1Used, + float q2Used, int q1Prior, int q2Prior) { int p1 = 0; int p2 = 0; if (respectPriority) { - p1 = q1.getPriority().getPriority(); - p2 = q2.getPriority().getPriority(); + p1 = q1Prior; + p2 = q2Prior; } int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used, @@ -158,16 +164,16 @@ private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, // capacity goes first if (0 == rc) { Resource minEffRes1 = - q1.getQueueResourceQuotas().getConfiguredMinResource(partition); + q1Sort.configuredMinResource; Resource minEffRes2 = - q2.getQueueResourceQuotas().getConfiguredMinResource(partition); + q2Sort.configuredMinResource; if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals( Resources.none())) { return minEffRes2.compareTo(minEffRes1); } - float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition); - float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition); + float abs1 = q1Sort.absoluteCapacity; + float abs2 = q2Sort.absoluteCapacity; return Float.compare(abs2, abs1); } @@ -203,6 +209,37 @@ private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, } } + /** + * A simple storage class to represent a snapshot of a queue. + */ + public static class PriorityQueueResourcesForSorting { + private final float absoluteUsedCapacity; + private final float usedCapacity; + private final Resource configuredMinResource; + private final float absoluteCapacity; + private final CSQueue queue; + + PriorityQueueResourcesForSorting(CSQueue queue) { + this.queue = queue; + this.absoluteUsedCapacity = + queue.getQueueCapacities(). + getAbsoluteUsedCapacity(partitionToLookAt.get()); + this.usedCapacity = + queue.getQueueCapacities(). + getUsedCapacity(partitionToLookAt.get()); + this.absoluteCapacity = + queue.getQueueCapacities(). + getAbsoluteCapacity(partitionToLookAt.get()); + this.configuredMinResource = + queue.getQueueResourceQuotas(). + getConfiguredMinResource(partitionToLookAt.get()); + } + + public CSQueue getQueue() { + return queue; + } + } + public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) { this.respectPriority = respectPriority; } @@ -214,12 +251,14 @@ public void setQueues(List queues) { @Override public Iterator getAssignmentIterator(String partition) { - // Since partitionToLookAt is a thread local variable, and every time we - // copy and sort queues, so it's safe for multi-threading environment. + // partitionToLookAt is a thread local variable, therefore it is safe to mutate it. PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition); - List sortedQueue = new ArrayList<>(queues); - Collections.sort(sortedQueue, new PriorityQueueComparator()); - return sortedQueue.iterator(); + + // Sort the snapshot of the queues in order to avoid breaking the prerequisites of TimSort. + // See YARN-10178 for details. + return queues.stream().map(PriorityQueueResourcesForSorting::new).sorted( + new PriorityQueueComparator()).map(PriorityQueueResourcesForSorting::getQueue).collect( + Collectors.toList()).iterator(); } @Override