YARN-7739. DefaultAMSProcessor should properly check customized resource types against minimum/maximum allocation. (wangda)

Change-Id: I10cc9341237d9a2fc0f8c855efb98a36b91389e2
(cherry picked from commit d02e42cee4)
This commit is contained in:
Wangda Tan 2018-02-12 10:29:37 +08:00 committed by Jonathan Hung
parent d1b91ede4c
commit b5128b2ae2
3 changed files with 206 additions and 20 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -289,23 +291,23 @@ public class SchedulerUtils {
private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException {
if (resReq.getCapability().getMemorySize() < 0 ||
resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) {
throw new InvalidResourceRequestException("Invalid resource request"
+ ", requested memory < 0"
+ ", or requested memory > max configured"
+ ", requestedMemory=" + resReq.getCapability().getMemorySize()
+ ", maxMemory=" + maximumResource.getMemorySize());
}
if (resReq.getCapability().getVirtualCores() < 0 ||
resReq.getCapability().getVirtualCores() >
maximumResource.getVirtualCores()) {
throw new InvalidResourceRequestException("Invalid resource request"
+ ", requested virtual cores < 0"
+ ", or requested virtual cores > max configured"
+ ", requestedVirtualCores="
+ resReq.getCapability().getVirtualCores()
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
Resource requestedResource = resReq.getCapability();
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
ResourceInformation reqRI = requestedResource.getResourceInformation(i);
ResourceInformation maxRI = maximumResource.getResourceInformation(i);
if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
throw new InvalidResourceRequestException(
"Invalid resource request, requested resource type=[" + reqRI
.getName()
+ "] < 0 or greater than maximum allowed allocation. Requested "
+ "resource=" + requestedResource
+ ", maximum allowed allocation=" + maximumResource
+ ", please note that maximum allowed allocation is calculated "
+ "by scheduler based on maximum resource of registered "
+ "NodeManagers, which might be less than configured "
+ "maximum allocation=" + ResourceUtils
.getResourceTypesMaximumAllocation());
}
}
String labelExp = resReq.getNodeLabelExpression();
// we don't allow specify label expression other than resourceName=ANY now

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import java.io.IOException;
import java.util.ArrayList;
@ -29,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -48,11 +52,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -62,13 +69,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
@ -665,6 +677,176 @@ public class TestApplicationMasterService {
rm.stop();
}
@Test(timeout = 300000)
public void testCSValidateRequestCapacityAgainstMinMaxAllocation()
throws Exception {
testValidateRequestCapacityAgainstMinMaxAllocation(CapacityScheduler.class);
}
@Test(timeout = 300000)
public void testFSValidateRequestCapacityAgainstMinMaxAllocation()
throws Exception {
testValidateRequestCapacityAgainstMinMaxAllocation(FairScheduler.class);
}
private void testValidateRequestCapacityAgainstMinMaxAllocation(Class<?> schedulerCls)
throws Exception {
// Initialize resource map for 2 types.
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource
// types
conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
.createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null));
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// Now request resource, memory > allowed
boolean exception = false;
try {
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*")
.build()), null);
} catch (InvalidResourceRequestException e) {
exception = true;
}
Assert.assertTrue(exception);
exception = false;
try {
// Now request resource, vcore > allowed
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*")
.build()), null);
} catch (InvalidResourceRequestException e) {
exception = true;
}
Assert.assertTrue(exception);
rm.close();
}
@Test(timeout = 300000)
public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes()
throws Exception {
// Initialize resource map for 2 types.
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res_1 = ResourceInformation.newInstance("res_1",
ResourceInformation.VCORES.getUnits(), 0, 4);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put("res_1", res_1);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource
// types
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
.createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ImmutableMap.of("res_1", 4)));
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
Assert.assertEquals(Resource.newInstance(1 * GB, 1),
leafQueue.getUsedResources());
// Now request resource, memory > allowed
boolean exception = false;
try {
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1)))
.numContainers(1).resourceName("*").build()), null);
} catch (InvalidResourceRequestException e) {
exception = true;
}
Assert.assertTrue(exception);
exception = false;
try {
// Now request resource, vcore > allowed
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1)))
.numContainers(1).resourceName("*")
.build()), null);
} catch (InvalidResourceRequestException e) {
exception = true;
}
Assert.assertTrue(exception);
exception = false;
try {
// Now request resource, res_1 > allowed
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100)))
.numContainers(1).resourceName("*")
.build()), null);
} catch (InvalidResourceRequestException e) {
exception = true;
}
Assert.assertTrue(exception);
rm.close();
}
private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMContainer rmContainer = cs.getRMContainer(containerId);

View File

@ -467,9 +467,11 @@ public class TestUtils {
public static Resource createResource(long memory, int vcores,
Map<String, Integer> nameToValues) {
Resource res = Resource.newInstance(memory, vcores);
for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
res.setResourceInformation(entry.getKey(), ResourceInformation
.newInstance(entry.getKey(), "", entry.getValue()));
if (nameToValues != null) {
for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
res.setResourceInformation(entry.getKey(), ResourceInformation
.newInstance(entry.getKey(), "", entry.getValue()));
}
}
return res;
}