YARN-10009. In Capacity Scheduler, DRC can treat minimum user limit percent as a max when custom resource is defined. Contributed by Eric Payne
This commit is contained in:
parent
11c5396b4a
commit
355ec33416
@ -428,10 +428,14 @@ public Resource divideAndCeil(Resource numerator, long denominator) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource divideAndCeil(Resource numerator, float denominator) {
|
public Resource divideAndCeil(Resource numerator, float denominator) {
|
||||||
return Resources.createResource(
|
Resource ret = Resource.newInstance(numerator);
|
||||||
divideAndCeil(numerator.getMemorySize(), denominator),
|
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||||
divideAndCeil(numerator.getVirtualCores(), denominator)
|
for (int i = 0; i < maxLength; i++) {
|
||||||
);
|
ResourceInformation resourceInformation = ret.getResourceInformation(i);
|
||||||
|
resourceInformation
|
||||||
|
.setValue(divideAndCeil(resourceInformation.getValue(), denominator));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -18,7 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
@ -28,12 +36,24 @@
|
|||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
@ -343,4 +363,124 @@ public void testMaxLimitsOfQueueWithMultipleResources() throws Exception {
|
|||||||
Assert.assertEquals(0, report_nm1.getAvailableResource()
|
Assert.assertEquals(0, report_nm1.getAvailableResource()
|
||||||
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
.getResourceInformation(ResourceInformation.GPU_URI).getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testConsumeAllExtendedResourcesWithSmallMinUserLimitPct()
|
||||||
|
throws Exception {
|
||||||
|
int GB = 1024;
|
||||||
|
|
||||||
|
// Initialize resource map for 3 types.
|
||||||
|
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||||
|
|
||||||
|
// Initialize mandatory resources
|
||||||
|
ResourceInformation memory = ResourceInformation.newInstance(
|
||||||
|
ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
ResourceInformation.MEMORY_MB.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||||
|
ResourceInformation.VCORES.getName(),
|
||||||
|
ResourceInformation.VCORES.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||||
|
ResourceInformation res1 = ResourceInformation.newInstance("res_1",
|
||||||
|
"", 0, 10);
|
||||||
|
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||||
|
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||||
|
riMap.put("res_1", res1);
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration csconf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csconf.set("yarn.resource-types", "res_1");
|
||||||
|
csconf.set("yarn.resource-types.res_1.minimum-allocation", "0");
|
||||||
|
csconf.set("yarn.resource-types.res_1.maximum-allocation", "10");
|
||||||
|
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
csconf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {"a", "b"});
|
||||||
|
|
||||||
|
// Set each queue to contain 50% each.
|
||||||
|
csconf.setCapacity(A_QUEUE, A_CAPACITY);
|
||||||
|
csconf.setCapacity(B_QUEUE, B_CAPACITY);
|
||||||
|
csconf.setMaximumCapacity(A_QUEUE, 100.0f);
|
||||||
|
csconf.setUserLimitFactor(A_QUEUE, 2);
|
||||||
|
|
||||||
|
YarnConfiguration yarnConf = new YarnConfiguration(csconf);
|
||||||
|
// Don't reset resource types since we have already configured resource
|
||||||
|
// types
|
||||||
|
yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
|
||||||
|
false);
|
||||||
|
yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(yarnConf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
LeafQueue qb = (LeafQueue)cs.getQueue("a");
|
||||||
|
// Setting minimum user limit percent should not affect max user resource
|
||||||
|
// limit using extended resources with DRF (see YARN-10009).
|
||||||
|
qb.setUserLimit(25);
|
||||||
|
|
||||||
|
// add app 1
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||||
|
|
||||||
|
RMAppAttemptMetrics attemptMetric =
|
||||||
|
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
|
||||||
|
RMAppImpl app = mock(RMAppImpl.class);
|
||||||
|
when(app.getApplicationId()).thenReturn(appId);
|
||||||
|
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(attempt.getMasterContainer()).thenReturn(container);
|
||||||
|
ApplicationSubmissionContext submissionContext = mock(
|
||||||
|
ApplicationSubmissionContext.class);
|
||||||
|
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||||
|
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
|
||||||
|
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||||
|
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||||
|
|
||||||
|
rm.getRMContext().getRMApps().put(appId, app);
|
||||||
|
|
||||||
|
SchedulerEvent addAppEvent =
|
||||||
|
new AppAddedSchedulerEvent(appId, "a", "user1");
|
||||||
|
cs.handle(addAppEvent);
|
||||||
|
SchedulerEvent addAttemptEvent =
|
||||||
|
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
|
||||||
|
cs.handle(addAttemptEvent);
|
||||||
|
|
||||||
|
// add nodes to cluster. Cluster has 20GB, 20 vcores, 80 res_1s.
|
||||||
|
HashMap<String, Long> resMap = new HashMap<String, Long>();
|
||||||
|
resMap.put("res_1", 80L);
|
||||||
|
Resource newResource = Resource.newInstance(2048 * GB, 100, resMap);
|
||||||
|
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
|
||||||
|
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||||
|
|
||||||
|
FiCaSchedulerApp fiCaApp1 =
|
||||||
|
cs.getSchedulerApplications().get(app.getApplicationId())
|
||||||
|
.getCurrentAppAttempt();
|
||||||
|
|
||||||
|
// allocate 8 containers for app1 with 1GB memory, 1 vcore, 10 res_1s
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
fiCaApp1.updateResourceRequests(Collections.singletonList(
|
||||||
|
ResourceRequest.newBuilder()
|
||||||
|
.capability(TestUtils.createResource(1 * GB, 1,
|
||||||
|
ImmutableMap.of("res_1", 10)))
|
||||||
|
.numContainers(1)
|
||||||
|
.resourceName("*")
|
||||||
|
.build()));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(node));
|
||||||
|
}
|
||||||
|
Assert.assertEquals(8*GB, fiCaApp1.getCurrentConsumption().getMemorySize());
|
||||||
|
Assert.assertEquals(80,
|
||||||
|
fiCaApp1.getCurrentConsumption()
|
||||||
|
.getResourceInformation("res_1").getValue());
|
||||||
|
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user