From 3d5ade1839205db1c4a11f73bd02d847187f48ca Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 26 Mar 2020 13:23:43 +0100 Subject: [PATCH] YARN-10043. FairOrderingPolicy Improvements. Contributed by Manikandan R --- .../SchedulerApplicationAttempt.java | 10 ++ .../scheduler/policy/FairOrderingPolicy.java | 47 +++++-- .../scheduler/policy/SchedulableEntity.java | 6 + .../policy/MockSchedulableEntity.java | 12 ++ .../policy/TestFairOrderingPolicy.java | 117 +++++++++++++++++- 5 files changed, 181 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 1a1f40145f2..657c03cfd15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.Schedulabl import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -209,6 +210,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private String nodeLabelExpression; + private final long startTime; + public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext) { @@ -242,6 +245,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); + startTime = SystemClock.getInstance().getTime(); } public void setOpportunisticContainerContext( @@ -1487,4 +1491,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public String getPartition() { return nodeLabelExpression == null ? "" : nodeLabelExpression; } + + + @Override + public long getStartTime() { + return startTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java index 1f4522ce3f9..9e66582e04f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -28,14 +28,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; /** - * An OrderingPolicy which orders SchedulableEntities for fairness (see - * FairScheduler - * FairSharePolicy), generally, processes with lesser usage are lesser. If - * sizedBasedWeight is set to true then an application with high demand - * may be prioritized ahead of an application with less usage. This - * is to offset the tendency to favor small apps, which could result in - * starvation for large apps if many small ones enter and leave the queue - * continuously (optional, default false) + * + * FairOrderingPolicy comparison goes through following steps. + * + * 1.Fairness based comparison. SchedulableEntities with lesser usage would be + * preferred when compared to another. If sizedBasedWeight is set to true then + * an application with high demand may be prioritized ahead of an application + * with less usage. This is to offset the tendency to favor small apps, which + * could result in starvation for large apps if many small ones enter and leave + * the queue continuously (optional, default false) + * + * 2. Compare using job submit time. SchedulableEntities submitted earlier would + * be preferred than later. + * + * 3. Compare demands. SchedulableEntities without resource demand get lower + * priority than ones which have demands. */ public class FairOrderingPolicy extends AbstractComparatorOrderingPolicy { @@ -46,6 +53,30 @@ public class FairOrderingPolicy extends AbstractCom @Override public int compare(final SchedulableEntity r1, final SchedulableEntity r2) { int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) ); + + if (res == 0) { + res = (int) Math.signum(r1.getStartTime() - r2.getStartTime()); + } + + if (res == 0) { + res = compareDemand(r1, r2); + } + return res; + } + + private int compareDemand(SchedulableEntity s1, SchedulableEntity s2) { + int res = 0; + long demand1 = s1.getSchedulingResourceUsage() + .getCachedDemand(CommonNodeLabelsManager.ANY).getMemorySize(); + long demand2 = s2.getSchedulingResourceUsage() + .getCachedDemand(CommonNodeLabelsManager.ANY).getMemorySize(); + + if ((demand1 == 0) && (demand2 > 0)) { + res = 1; + } else if ((demand2 == 0) && (demand1 > 0)) { + res = -1; + } + return res; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java index be835560ade..e2a690947ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java @@ -60,4 +60,10 @@ public interface SchedulableEntity { * @return partition */ String getPartition(); + + /** + * Start time of the job. + * @return start time + */ + long getStartTime(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java index 62f7a4956b9..4c75aced19e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -31,6 +32,7 @@ public class MockSchedulableEntity implements SchedulableEntity { private Priority priority; private boolean isRecovering; private String partition = ""; + private long startTime; public MockSchedulableEntity() { } @@ -39,6 +41,7 @@ public class MockSchedulableEntity implements SchedulableEntity { this.serial = serial; this.priority = Priority.newInstance(priority); this.isRecovering = isRecovering; + this.startTime = SystemClock.getInstance().getTime(); } public void setId(String id) { @@ -108,4 +111,13 @@ public class MockSchedulableEntity implements SchedulableEntity { public void setPartition(String partition) { this.partition = partition; } + + @Override + public long getStartTime() { + return this.startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index eab4eb702f5..d51f9f5a250 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; +import static org.junit.Assert.assertEquals; + import java.util.*; import org.apache.hadoop.yarn.api.records.NodeId; @@ -47,7 +49,8 @@ public class TestFairOrderingPolicy { MockSchedulableEntity r1 = new MockSchedulableEntity(); MockSchedulableEntity r2 = new MockSchedulableEntity(); - Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); //consumption r1.setUsed(Resources.createResource(1, 0)); @@ -65,7 +68,8 @@ public class TestFairOrderingPolicy { MockSchedulableEntity r2 = new MockSchedulableEntity(); //No changes, equal - Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); r1.setUsed(Resources.createResource(4 * GB)); r2.setUsed(Resources.createResource(4 * GB)); @@ -79,7 +83,8 @@ public class TestFairOrderingPolicy { r2.getSchedulingResourceUsage()); //Same, equal - Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); r2.setUsed(Resources.createResource(5 * GB)); r2.setPending(Resources.createResource(5 * GB)); @@ -235,4 +240,110 @@ public class TestFairOrderingPolicy { } } + @Test + public void testOrderingUsingUsedAndPendingResources() { + FairOrderingPolicy policy = + new FairOrderingPolicy<>(); + policy.setSizeBasedWeight(true); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + r1.setUsed(Resources.createResource(4 * GB)); + r2.setUsed(Resources.createResource(4 * GB)); + + r1.setPending(Resources.createResource(4 * GB)); + r2.setPending(Resources.createResource(4 * GB)); + + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage()); + + // Same, equal + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); + + r1.setUsed(Resources.createResource(4 * GB)); + r2.setUsed(Resources.createResource(8 * GB)); + + r1.setPending(Resources.createResource(4 * GB)); + r2.setPending(Resources.createResource(8 * GB)); + + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage()); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + } + + @Test + public void testOrderingUsingAppSubmitTime() { + FairOrderingPolicy policy = + new FairOrderingPolicy<>(); + policy.setSizeBasedWeight(true); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + // R1, R2 has been started at same time + assertEquals(r1.getStartTime(), r2.getStartTime()); + + // No changes, equal + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); + + // R2 has been started after R1 + r1.setStartTime(5); + r2.setStartTime(10); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + + // R1 has been started after R2 + r1.setStartTime(10); + r2.setStartTime(5); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testOrderingUsingAppDemand() { + FairOrderingPolicy policy = + new FairOrderingPolicy(); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + r1.setUsed(Resources.createResource(0)); + r2.setUsed(Resources.createResource(0)); + + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage()); + + // Same, equal + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); + + // Compare demands ensures entity without resource demands gets lower + // priority + r1.setPending(Resources.createResource(0)); + r2.setPending(Resources.createResource(8 * GB)); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage()); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + + // When both entity has certain demands, then there is no actual comparison + r1.setPending(Resources.createResource(4 * GB)); + r2.setPending(Resources.createResource(12 * GB)); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy + .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage()); + + assertEquals("Comparator Output", 0, + policy.getComparator().compare(r1, r2)); + } }