YARN-10009. In Capacity Scheduler, DRC can treat minimum user limit percent as a max when custom resource is defined. Contributed by Eric Payne.

This commit is contained in:
Eric Badger 2019-12-20 18:07:00 +00:00
parent f777cd398f
commit 412035b47a
2 changed files with 150 additions and 4 deletions

View File

@ -432,10 +432,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override
public Resource divideAndCeil(Resource numerator, float denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemorySize(), denominator),
divideAndCeil(numerator.getVirtualCores(), denominator)
);
Resource ret = Resource.newInstance(numerator);
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation resourceInformation = ret.getResourceInformation(i);
resourceInformation
.setValue(divideAndCeil(resourceInformation.getValue(), denominator));
}
return ret;
}
@Override

View File

@ -19,6 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -28,20 +32,38 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@ -351,4 +373,124 @@ public class TestCapacitySchedulerWithMultiResourceTypes {
Assert.assertEquals(0, report_nm1.getAvailableResource()
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
}
@Test(timeout = 300000)
public void testConsumeAllExtendedResourcesWithSmallMinUserLimitPct()
throws Exception {
int GB = 1024;
// Initialize resource map for 3 types.
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res1 = ResourceInformation.newInstance("res_1",
"", 0, 10);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put("res_1", res1);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.set("yarn.resource-types", "res_1");
csconf.set("yarn.resource-types.res_1.minimum-allocation", "0");
csconf.set("yarn.resource-types.res_1.maximum-allocation", "10");
csconf.setResourceComparator(DominantResourceCalculator.class);
// Define top-level queues
csconf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});
// Set each queue to contain 50% each.
csconf.setCapacity(A_QUEUE, A_CAPACITY);
csconf.setCapacity(B_QUEUE, B_CAPACITY);
csconf.setMaximumCapacity(A_QUEUE, 100.0f);
csconf.setUserLimitFactor(A_QUEUE, 2);
YarnConfiguration yarnConf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource
// types
yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
false);
yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(yarnConf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qb = (LeafQueue)cs.getQueue("a");
// Setting minimum user limit percent should not affect max user resource
// limit using extended resources with DRF (see YARN-10009).
qb.setUserLimit(25);
// add app 1
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
rm.getRMContext().getRMApps().put(appId, app);
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "a", "user1");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
// add nodes to cluster. Cluster has 20GB, 20 vcores, 80 res_1s.
HashMap<String, Long> resMap = new HashMap<String, Long>();
resMap.put("res_1", 80L);
Resource newResource = Resource.newInstance(2048 * GB, 100, resMap);
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
FiCaSchedulerApp fiCaApp1 =
cs.getSchedulerApplications().get(app.getApplicationId())
.getCurrentAppAttempt();
// allocate 8 containers for app1 with 1GB memory, 1 vcore, 10 res_1s
for (int i = 0; i < 8; i++) {
fiCaApp1.updateResourceRequests(Collections.singletonList(
ResourceRequest.newBuilder()
.capability(TestUtils.createResource(1 * GB, 1,
ImmutableMap.of("res_1", 10)))
.numContainers(1)
.resourceName("*")
.build()));
cs.handle(new NodeUpdateSchedulerEvent(node));
}
assertEquals(8*GB, fiCaApp1.getCurrentConsumption().getMemorySize());
assertEquals(80,
fiCaApp1.getCurrentConsumption()
.getResourceInformation("res_1").getValue());
rm.close();
}
}