From 355ec3341685deb93a40daffa3f28965b7609286 Mon Sep 17 00:00:00 2001 From: Eric Badger Date: Fri, 20 Dec 2019 19:32:36 +0000 Subject: [PATCH] YARN-10009. In Capacity Scheduler, DRC can treat minimum user limit percent as a max when custom resource is defined. Contributed by Eric Payne --- .../resource/DominantResourceCalculator.java | 12 +- ...pacitySchedulerWithMultiResourceTypes.java | 140 ++++++++++++++++++ 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index e4721493d2f..f2ebffcee9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -428,10 +428,14 @@ public class DominantResourceCalculator extends ResourceCalculator { @Override public Resource divideAndCeil(Resource numerator, float denominator) { - return Resources.createResource( - divideAndCeil(numerator.getMemorySize(), denominator), - divideAndCeil(numerator.getVirtualCores(), denominator) - ); + Resource ret = Resource.newInstance(numerator); + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation resourceInformation = ret.getResourceInformation(i); + resourceInformation + .setValue(divideAndCeil(resourceInformation.getValue(), denominator)); + } + return ret; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java index 720e7872540..fa2b99354c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWithMultiResourceTypes.java @@ -18,7 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -28,12 +36,24 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -343,4 +363,124 @@ public class TestCapacitySchedulerWithMultiResourceTypes { Assert.assertEquals(0, report_nm1.getAvailableResource() .getResourceInformation(ResourceInformation.GPU_URI).getValue()); } + + + @Test(timeout = 300000) + public void testConsumeAllExtendedResourcesWithSmallMinUserLimitPct() + throws Exception { + int GB = 1024; + + // Initialize resource map for 3 types. + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + ResourceInformation res1 = ResourceInformation.newInstance("res_1", + "", 0, 10); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put("res_1", res1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.set("yarn.resource-types", "res_1"); + csconf.set("yarn.resource-types.res_1.minimum-allocation", "0"); + csconf.set("yarn.resource-types.res_1.maximum-allocation", "10"); + csconf.setResourceComparator(DominantResourceCalculator.class); + + // Define top-level queues + csconf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + + // Set each queue to contain 50% each. + csconf.setCapacity(A_QUEUE, A_CAPACITY); + csconf.setCapacity(B_QUEUE, B_CAPACITY); + csconf.setMaximumCapacity(A_QUEUE, 100.0f); + csconf.setUserLimitFactor(A_QUEUE, 2); + + YarnConfiguration yarnConf = new YarnConfiguration(csconf); + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(yarnConf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("a"); + // Setting minimum user limit percent should not affect max user resource + // limit using extended resources with DRF (see YARN-10009). + qb.setUserLimit(25); + + // add app 1 + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + + rm.getRMContext().getRMApps().put(appId, app); + + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, "a", "user1"); + cs.handle(addAppEvent); + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + cs.handle(addAttemptEvent); + + // add nodes to cluster. Cluster has 20GB, 20 vcores, 80 res_1s. + HashMap resMap = new HashMap(); + resMap.put("res_1", 80L); + Resource newResource = Resource.newInstance(2048 * GB, 100, resMap); + RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); + cs.handle(new NodeAddedSchedulerEvent(node)); + + FiCaSchedulerApp fiCaApp1 = + cs.getSchedulerApplications().get(app.getApplicationId()) + .getCurrentAppAttempt(); + + // allocate 8 containers for app1 with 1GB memory, 1 vcore, 10 res_1s + for (int i = 0; i < 8; i++) { + fiCaApp1.updateResourceRequests(Collections.singletonList( + ResourceRequest.newBuilder() + .capability(TestUtils.createResource(1 * GB, 1, + ImmutableMap.of("res_1", 10))) + .numContainers(1) + .resourceName("*") + .build())); + cs.handle(new NodeUpdateSchedulerEvent(node)); + } + Assert.assertEquals(8*GB, fiCaApp1.getCurrentConsumption().getMemorySize()); + Assert.assertEquals(80, + fiCaApp1.getCurrentConsumption() + .getResourceInformation("res_1").getValue()); + + rm.close(); + } }