From 37edbd35f1c743fe71c3b9cfefa59a3c720acc3b Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 22 Feb 2017 15:58:49 -0800 Subject: [PATCH] YARN-6194. Cluster capacity in SchedulingPolicy is updated only on allocation file reload. (Yufei Gu via kasha) (cherry picked from commit b10e962224a8ae1c6031a05322b0cc5e564bd078) --- .../scheduler/fair/FSContext.java | 21 ++++++++++++---- .../scheduler/fair/FSQueue.java | 2 +- .../scheduler/fair/FairScheduler.java | 6 ++--- .../scheduler/fair/SchedulingPolicy.java | 19 +++++++++++++- .../DominantResourceFairnessPolicy.java | 16 ++++++------ .../scheduler/fair/TestFairScheduler.java | 8 +++--- .../TestDominantResourceFairnessPolicy.java | 25 ++++++++++++++++++- 7 files changed, 74 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java index 56bc99c586b..a4aa8f49eaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.yarn.api.records.Resource; + /** * Helper class that holds basic information to be passed around * FairScheduler classes. Think of this as a glorified map that holds key @@ -27,28 +29,37 @@ public class FSContext { private boolean preemptionEnabled = false; private float preemptionUtilizationThreshold; private FSStarvedApps starvedApps; + private FairScheduler scheduler; - public boolean isPreemptionEnabled() { + FSContext(FairScheduler scheduler) { + this.scheduler = scheduler; + } + + boolean isPreemptionEnabled() { return preemptionEnabled; } - public void setPreemptionEnabled() { + void setPreemptionEnabled() { this.preemptionEnabled = true; if (starvedApps == null) { starvedApps = new FSStarvedApps(); } } - public FSStarvedApps getStarvedApps() { + FSStarvedApps getStarvedApps() { return starvedApps; } - public float getPreemptionUtilizationThreshold() { + float getPreemptionUtilizationThreshold() { return preemptionUtilizationThreshold; } - public void setPreemptionUtilizationThreshold( + void setPreemptionUtilizationThreshold( float preemptionUtilizationThreshold) { this.preemptionUtilizationThreshold = preemptionUtilizationThreshold; } + + public Resource getClusterResource() { + return scheduler.getClusterResource(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 7e8b85872ad..b5592c55bc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -135,7 +135,7 @@ public abstract class FSQueue implements Queue, Schedulable { } public void setPolicy(SchedulingPolicy policy) { - policy.initialize(scheduler.getClusterResource()); + policy.initialize(scheduler.getContext()); this.policy = policy; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ae13b236719..fe79d40fd6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -206,13 +206,12 @@ public class FairScheduler extends public FairScheduler() { super(FairScheduler.class.getName()); - context = new FSContext(); + context = new FSContext(this); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } - @VisibleForTesting public FSContext getContext() { return context; } @@ -1450,8 +1449,7 @@ public class FairScheduler extends } else { allocConf = queueInfo; setQueueAcls(allocConf.getQueueAcls()); - allocConf.getDefaultSchedulingPolicy().initialize( - getClusterResource()); + allocConf.getDefaultSchedulingPolicy().initialize(getContext()); queueMgr.updateAllocationConfiguration(allocConf); applyChildDefaults(); maxRunningEnforcer.updateRunnabilityOnReload(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 3fe36f38134..9a9be8c10c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -91,9 +91,26 @@ public abstract class SchedulingPolicy { } return getInstance(clazz); } - + + /** + * Initialize the scheduling policy with cluster resources. + * @deprecated Since it doesn't track cluster resource changes, replaced by + * {@link #initialize(FSContext)}. + * + * @param clusterCapacity cluster resources + */ + @Deprecated public void initialize(Resource clusterCapacity) {} + /** + * Initialize the scheduling policy with a {@link FSContext} object, which has + * a pointer to the cluster resources among other information. + * + * @param fsContext a {@link FSContext} object which has a pointer to the + * cluster resources + */ + public void initialize(FSContext fsContext) {} + /** * The {@link ResourceCalculator} returned by this method should be used * for any calculations involving resources. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 369b8a17633..193ed4dc36c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; @@ -104,17 +105,17 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { } @Override - public void initialize(Resource clusterCapacity) { - COMPARATOR.setClusterCapacity(clusterCapacity); + public void initialize(FSContext fsContext) { + COMPARATOR.setFSContext(fsContext); } public static class DominantResourceFairnessComparator implements Comparator { private static final int NUM_RESOURCES = ResourceType.values().length; - - private Resource clusterCapacity; - public void setClusterCapacity(Resource clusterCapacity) { - this.clusterCapacity = clusterCapacity; + private FSContext fsContext; + + public void setFSContext(FSContext fsContext) { + this.fsContext = fsContext; } @Override @@ -125,7 +126,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { ResourceWeights sharesOfMinShare2 = new ResourceWeights(); ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES]; ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES]; - + Resource clusterCapacity = fsContext.getClusterResource(); + // Calculate shares of the cluster for each resource both schedulables. calculateShares(s1.getResourceUsage(), clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 4873079c0d2..9ba5013029d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3285,7 +3285,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterResource()); + drfPolicy.initialize(scheduler.getContext()); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -3331,7 +3331,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterResource()); + drfPolicy.initialize(scheduler.getContext()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -3346,7 +3346,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(updateEvent); Assert.assertEquals(1, app2.getLiveContainers().size()); } - + @Test public void testDRFHierarchicalQueues() throws Exception { scheduler.init(conf); @@ -3376,7 +3376,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterResource()); + drfPolicy.initialize(scheduler.getContext()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index a5c20c1b050..3719e2aee08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.Comparator; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -40,7 +43,10 @@ public class TestDominantResourceFairnessPolicy { private Comparator createComparator(int clusterMem, int clusterCpu) { DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy(); - policy.initialize(BuilderUtils.newResource(clusterMem, clusterCpu)); + FSContext fsContext = mock(FSContext.class); + when(fsContext.getClusterResource()). + thenReturn(Resources.createResource(clusterMem, clusterCpu)); + policy.initialize(fsContext); return policy.getComparator(); } @@ -160,4 +166,21 @@ public class TestDominantResourceFairnessPolicy { assertEquals(ResourceType.CPU, resourceOrder[0]); assertEquals(ResourceType.MEMORY, resourceOrder[1]); } + + @Test + public void testCompareSchedulablesWithClusterResourceChanges(){ + Schedulable schedulable1 = createSchedulable(2000, 1); + Schedulable schedulable2 = createSchedulable(1000, 2); + + // schedulable1 has share weights [1/2, 1/5], schedulable2 has share + // weights [1/4, 2/5], schedulable1 > schedulable2 since 1/2 > 2/5 + assertTrue(createComparator(4000, 5) + .compare(schedulable1, schedulable2) > 0); + + // share weights have changed because of the cluster resource change. + // schedulable1 has share weights [1/4, 1/6], schedulable2 has share + // weights [1/8, 1/3], schedulable1 < schedulable2 since 1/4 < 1/3 + assertTrue(createComparator(8000, 6) + .compare(schedulable1, schedulable2) < 0); + } }