From 79a46599f76e470527ad94b0894dacb28db01465 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Fri, 5 Feb 2021 17:08:16 +0100 Subject: [PATCH] YARN-10428. Zombie applications in the YARN queue using FAIR + sizebasedweight. Contributed by Guang Yang, Andras Gyori --- .../scheduler/policy/FairOrderingPolicy.java | 6 ++- .../policy/TestFairOrderingPolicy.java | 42 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java index 863d8e228b4..645492e9496 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -100,10 +100,12 @@ public class FairOrderingPolicy extends AbstractCom private double getMagnitude(SchedulableEntity r) { double mag = r.getSchedulingResourceUsage().getCachedUsed( CommonNodeLabelsManager.ANY).getMemorySize(); - if (sizeBasedWeight) { + if (sizeBasedWeight && mag != 0) { double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand( CommonNodeLabelsManager.ANY).getMemorySize()) / Math.log(2); - mag = mag / weight; + if (weight != 0) { + mag = mag / weight; + } } return mag; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index d51f9f5a250..ac5caedc9db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -23,7 +23,9 @@ import static org.junit.Assert.assertEquals; import java.util.*; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; @@ -346,4 +348,44 @@ public class TestFairOrderingPolicy { assertEquals("Comparator Output", 0, policy.getComparator().compare(r1, r2)); } + + @Test + public void testRemoveEntitiesWithSizeBasedWeightAsCompletedJobs() { + FairOrderingPolicy policy = + new FairOrderingPolicy(); + policy.setSizeBasedWeight(true); + + // Add 10 different schedulable entities + List entities = new ArrayList<>(10); + for (int i = 1; i <= 10; i++) { + MockSchedulableEntity r = new MockSchedulableEntity(); + r.setApplicationPriority(Priority.newInstance(i)); + r.setUsed(Resources.createResource(4 * i)); + r.setPending(Resources.createResource(4 * i)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r.getSchedulingResourceUsage()); + policy.addSchedulableEntity(r); + entities.add(r); + } + + // Mark the first 5 entities as completed by setting + // the resources to 0 + for (int i = 0; i < 5; i++) { + MockSchedulableEntity r = entities.get(i); + r.getSchedulingResourceUsage().setCachedUsed( + CommonNodeLabelsManager.ANY, Resources.createResource(0)); + r.getSchedulingResourceUsage().setCachedPending( + CommonNodeLabelsManager.ANY, Resources.createResource(0)); + policy.entityRequiresReordering(r); + } + + policy.reorderScheduleEntities(); + + // Remove the first 5 elements + for (int i = 0; i < 5; i++) { + policy.removeSchedulableEntity(entities.get(i)); + } + + Assert.assertEquals(5, policy.getNumSchedulableEntities()); + } }