diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e85356eb25d..67b1b2bf5df 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -326,6 +326,9 @@ Release 2.1.0-beta - UNRELEASED YARN-803. factor out scheduler config validation from the ResourceManager to each scheduler implementation. (tucu) + YARN-789. Enable zero capabilities resource requests in fair scheduler. + (tucu) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java index 5a691afab70..6d82db77d34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java @@ -54,19 +54,25 @@ public Resource divideAndCeil(Resource numerator, int denominator) { @Override public Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource) { + Resource maximumResource, Resource stepFactor) { int normalizedMemory = Math.min( roundUp( Math.max(r.getMemory(), minimumResource.getMemory()), - minimumResource.getMemory()), + stepFactor.getMemory()), maximumResource.getMemory()); return Resources.createResource(normalizedMemory); } @Override - public Resource roundUp(Resource r, Resource minimumResource) { + public Resource normalize(Resource r, Resource minimumResource, + Resource maximumResource) { + return normalize(r, minimumResource, maximumResource, minimumResource); + } + + @Override + public Resource roundUp(Resource r, Resource stepFactor) { return Resources.createResource( - roundUp(r.getMemory(),minimumResource.getMemory()) + roundUp(r.getMemory(), stepFactor.getMemory()) ); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java index 2f6699038fb..ef53e8dc14c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java @@ -124,26 +124,26 @@ public Resource divideAndCeil(Resource numerator, int denominator) { @Override public Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource) { + Resource maximumResource, Resource stepFactor) { int normalizedMemory = Math.min( - roundUp( - Math.max(r.getMemory(), minimumResource.getMemory()), - minimumResource.getMemory()), - maximumResource.getMemory()); + roundUp( + Math.max(r.getMemory(), minimumResource.getMemory()), + stepFactor.getMemory()), + maximumResource.getMemory()); int normalizedCores = Math.min( - roundUp( - Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), - minimumResource.getVirtualCores()), - maximumResource.getVirtualCores()); + roundUp( + Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), + stepFactor.getVirtualCores()), + maximumResource.getVirtualCores()); return Resources.createResource(normalizedMemory, - normalizedCores); + normalizedCores); } @Override - public Resource roundUp(Resource r, Resource minimumResource) { + public Resource roundUp(Resource r, Resource stepFactor) { return Resources.createResource( - roundUp(r.getMemory(), minimumResource.getMemory()), - roundUp(r.getVirtualCores(), minimumResource.getVirtualCores()) + roundUp(r.getMemory(), stepFactor.getMemory()), + roundUp(r.getVirtualCores(), stepFactor.getVirtualCores()) ); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java index b2dd19bbf5c..de3777df5f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java @@ -96,8 +96,26 @@ public abstract Resource multiplyAndNormalizeDown( * @param maximumResource the upper bound of the resource to be allocated * @return normalized resource */ + public Resource normalize(Resource r, Resource minimumResource, + Resource maximumResource) { + return normalize(r, minimumResource, maximumResource, minimumResource); + } + + /** + * Normalize resource r given the base + * minimumResource and verify against max allowed + * maximumResource using a step factor for hte normalization. + * + * @param r resource + * @param minimumResource minimum value + * @param maximumResource the upper bound of the resource to be allocated + * @param stepFactor the increment for resources to be allocated + * @return normalized resource + */ public abstract Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource); + Resource maximumResource, + Resource stepFactor); + /** * Round-up resource r given factor stepFactor. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java index 58cd676b235..5e67098d4f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java @@ -132,9 +132,9 @@ public static Resource multiplyAndRoundDown(Resource lhs, double by) { } public static Resource normalize( - ResourceCalculator calculator, Resource lhs, Resource factor, - Resource limit) { - return calculator.normalize(lhs, factor, limit); + ResourceCalculator calculator, Resource lhs, Resource min, + Resource max, Resource increment) { + return calculator.normalize(lhs, min, max, increment); } public static Resource roundUp( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 572252b277d..d24a7d02f00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -82,6 +82,40 @@ public static ContainerStatus createAbnormalContainerStatus( return containerStatus; } + /** + * Utility method to normalize a list of resource requests, by insuring that + * the memory for each request is a multiple of minMemory and is not zero. + */ + public static void normalizeRequests( + List asks, + ResourceCalculator resourceCalculator, + Resource clusterResource, + Resource minimumResource, + Resource maximumResource) { + for (ResourceRequest ask : asks) { + normalizeRequest( + ask, resourceCalculator, clusterResource, minimumResource, + maximumResource, minimumResource); + } + } + + /** + * Utility method to normalize a resource request, by insuring that the + * requested memory is a multiple of minMemory and is not zero. + */ + public static void normalizeRequest( + ResourceRequest ask, + ResourceCalculator resourceCalculator, + Resource clusterResource, + Resource minimumResource, + Resource maximumResource) { + Resource normalized = + Resources.normalize( + resourceCalculator, ask.getCapability(), minimumResource, + maximumResource, minimumResource); + ask.setCapability(normalized); + } + /** * Utility method to normalize a list of resource requests, by insuring that * the memory for each request is a multiple of minMemory and is not zero. @@ -91,11 +125,12 @@ public static void normalizeRequests( ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumResource, - Resource maximumResource) { + Resource maximumResource, + Resource incrementResource) { for (ResourceRequest ask : asks) { normalizeRequest( ask, resourceCalculator, clusterResource, minimumResource, - maximumResource); + maximumResource, incrementResource); } } @@ -108,11 +143,12 @@ public static void normalizeRequest( ResourceCalculator resourceCalculator, Resource clusterResource, Resource minimumResource, - Resource maximumResource) { + Resource maximumResource, + Resource incrementResource) { Resource normalized = Resources.normalize( resourceCalculator, ask.getCapability(), minimumResource, - maximumResource); + maximumResource, incrementResource); ask.setCapability(normalized); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 79e97bb7b5a..a0af0e1ad6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -116,6 +116,7 @@ public class FairScheduler implements ResourceScheduler { private RMContext rmContext; private Resource minimumAllocation; private Resource maximumAllocation; + private Resource incrAllocation; private QueueManager queueMgr; private Clock clock; @@ -300,7 +301,7 @@ private void updatePreemptionVariables() { */ boolean isStarvedForMinShare(FSLeafQueue sched) { Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, - sched.getMinShare(), sched.getDemand()); + sched.getMinShare(), sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), desiredShare); } @@ -560,6 +561,10 @@ public Resource getMinimumResourceCapability() { return minimumAllocation; } + public Resource getIncrementResourceCapability() { + return incrAllocation; + } + @Override public Resource getMaximumResourceCapability() { return maximumAllocation; @@ -769,7 +774,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterCapacity, minimumAllocation, maximumAllocation); + clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); // Release containers for (ContainerId releasedContainerId : release) { @@ -1028,6 +1033,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext) validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 52f0d8faef3..6957f15bdad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -32,6 +32,16 @@ @Private @Evolving public class FairSchedulerConfiguration extends Configuration { + + /** Increment request grant-able by the RM scheduler. + * These properties are looked up in the yarn-site.xml */ + public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB = + YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb"; + public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024; + public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = + YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores"; + public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1; + public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml"; private static final String CONF_PREFIX = "yarn.scheduler.fair."; @@ -102,6 +112,16 @@ public Resource getMaximumAllocation() { return Resources.createResource(mem, cpu); } + public Resource getIncrementAllocation() { + int incrementMemory = getInt( + RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB); + int incrementCores = getInt( + RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES); + return Resources.createResource(incrementMemory, incrementCores); + } + public boolean getUserAsDefaultQueue() { return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 4c199d5f147..9928c0067f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -120,7 +123,9 @@ public void tick(int seconds) { public void setUp() throws IOException { scheduler = new FairScheduler(); Configuration conf = createConfiguration(); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); // All tests assume only one assignment per node update conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); @@ -281,6 +286,8 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setFloat(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7f); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + 128); scheduler.reinitialize(conf, resourceManager.getRMContext()); Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(3, scheduler.maxAssign); @@ -289,8 +296,44 @@ public void testLoadConfigurationOnInitialize() throws IOException { Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01); Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(128, + scheduler.getIncrementResourceCapability().getMemory()); } + @Test + public void testNonMinZeroResourcesSettings() throws IOException { + FairScheduler fs = new FairScheduler(); + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + fs.reinitialize(conf, null); + Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); + Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); + } + + @Test + public void testMinZeroResourcesSettings() throws IOException { + FairScheduler fs = new FairScheduler(); + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + fs.reinitialize(conf, null); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); + Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); + } + @Test public void testAggregateCapacityTracking() throws Exception { // Add a node @@ -412,8 +455,8 @@ public void testSimpleContainerAllocation() { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - // Asked for less than min_allocation. - assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + // Asked for less than increment allocation. + assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); @@ -571,7 +614,8 @@ public void testQueueDemandCalculation() throws Exception { ApplicationAttemptId id22 = createAppAttemptId(2, 2); scheduler.addApplication(id22, "root.queue2", "user1"); - int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB; + int minReqSize = + FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; // First ask, queue1 requests 1 large (minReqSize * 2). List ask1 = new ArrayList();