YARN-8248. Job hangs when a job requests a resource that its queue does not have. (Szilard Nemeth via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-05-21 08:00:21 -07:00
parent 3d2d9dbcaa
commit f48fec83d0
6 changed files with 484 additions and 78 deletions

View File

@ -18,9 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions
.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
@ -61,12 +68,37 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Unstable
public class SchedulerUtils {
/**
* This class contains invalid resource information along with its
* resource request.
*/
public static class MaxResourceValidationResult {
private ResourceRequest resourceRequest;
private List<ResourceInformation> invalidResources;
MaxResourceValidationResult(ResourceRequest resourceRequest,
List<ResourceInformation> invalidResources) {
this.resourceRequest = resourceRequest;
this.invalidResources = invalidResources;
}
public boolean isValid() {
return invalidResources.isEmpty();
}
@Override
public String toString() {
return "MaxResourceValidationResult{" + "resourceRequest="
+ resourceRequest + ", invalidResources=" + invalidResources + '}';
}
}
private static final Log LOG = LogFactory.getLog(SchedulerUtils.class);
private static final RecordFactory recordFactory =
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public static final String RELEASED_CONTAINER =
public static final String RELEASED_CONTAINER =
"Container released by application";
public static final String UPDATED_CONTAINER =
@ -325,6 +357,22 @@ public class SchedulerUtils {
}
}
private static Map<String, ResourceInformation> getZeroResources(
Resource resource) {
Map<String, ResourceInformation> resourceInformations = Maps.newHashMap();
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation resourceInformation =
resource.getResourceInformation(i);
if (resourceInformation.getValue() == 0L) {
resourceInformations.put(resourceInformation.getName(),
resourceInformation);
}
}
return resourceInformations;
}
@Private
@VisibleForTesting
static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
@ -339,49 +387,88 @@ public class SchedulerUtils {
reqResourceName);
}
final ResourceInformation availableRI =
availableResource.getResourceInformation(reqResourceName);
long requestedResourceValue = requestedRI.getValue();
long availableResourceValue = availableRI.getValue();
int unitsRelation = UnitsConversionUtil
.compareUnits(requestedRI.getUnits(), availableRI.getUnits());
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource information: " + requestedRI);
LOG.debug("Available resource information: " + availableRI);
LOG.debug("Relation of units: " + unitsRelation);
}
// requested resource unit is less than available resource unit
// e.g. requestedUnit: "m", availableUnit: "K")
if (unitsRelation < 0) {
availableResourceValue =
UnitsConversionUtil.convert(availableRI.getUnits(),
requestedRI.getUnits(), availableRI.getValue());
// requested resource unit is greater than available resource unit
// e.g. requestedUnit: "G", availableUnit: "M")
} else if (unitsRelation > 0) {
requestedResourceValue =
UnitsConversionUtil.convert(requestedRI.getUnits(),
availableRI.getUnits(), requestedRI.getValue());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource value after conversion: " +
requestedResourceValue);
LOG.info("Available resource value after conversion: " +
availableResourceValue);
}
if (requestedResourceValue > availableResourceValue) {
boolean valid = checkResource(requestedRI, availableResource);
if (!valid) {
throwInvalidResourceException(reqResource, availableResource,
reqResourceName);
}
}
}
public static MaxResourceValidationResult
validateResourceRequestsAgainstQueueMaxResource(
ResourceRequest resReq, Resource availableResource)
throws SchedulerInvalidResoureRequestException {
final Resource reqResource = resReq.getCapability();
Map<String, ResourceInformation> resourcesWithZeroAmount =
getZeroResources(availableResource);
if (LOG.isTraceEnabled()) {
LOG.trace("Resources with zero amount: "
+ Arrays.toString(resourcesWithZeroAmount.entrySet().toArray()));
}
List<ResourceInformation> invalidResources = Lists.newArrayList();
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
final ResourceInformation requestedRI =
reqResource.getResourceInformation(i);
final String reqResourceName = requestedRI.getName();
if (resourcesWithZeroAmount.containsKey(reqResourceName)
&& requestedRI.getValue() > 0) {
invalidResources.add(requestedRI);
}
}
return new MaxResourceValidationResult(resReq, invalidResources);
}
/**
* Checks requested ResouceInformation against available Resource.
* @param requestedRI
* @param availableResource
* @return true if request is valid, false otherwise.
*/
private static boolean checkResource(
ResourceInformation requestedRI, Resource availableResource) {
final ResourceInformation availableRI =
availableResource.getResourceInformation(requestedRI.getName());
long requestedResourceValue = requestedRI.getValue();
long availableResourceValue = availableRI.getValue();
int unitsRelation = UnitsConversionUtil.compareUnits(requestedRI.getUnits(),
availableRI.getUnits());
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource information: " + requestedRI);
LOG.debug("Available resource information: " + availableRI);
LOG.debug("Relation of units: " + unitsRelation);
}
// requested resource unit is less than available resource unit
// e.g. requestedUnit: "m", availableUnit: "K")
if (unitsRelation < 0) {
availableResourceValue =
UnitsConversionUtil.convert(availableRI.getUnits(),
requestedRI.getUnits(), availableRI.getValue());
// requested resource unit is greater than available resource unit
// e.g. requestedUnit: "G", availableUnit: "M")
} else if (unitsRelation > 0) {
requestedResourceValue =
UnitsConversionUtil.convert(requestedRI.getUnits(),
availableRI.getUnits(), requestedRI.getValue());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource value after conversion: "
+ requestedResourceValue);
LOG.info("Available resource value after conversion: "
+ availableResourceValue);
}
return requestedResourceValue <= availableResourceValue;
}
private static void throwInvalidResourceException(Resource reqResource,
Resource availableResource, String reqResourceName)
throws InvalidResourceRequestException {

View File

@ -459,7 +459,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Add it to allContainers list.
addToNewlyAllocatedContainers(node, rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
ContainerRequest containerRequest = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
@ -867,6 +866,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (reserved) {
unreserve(schedulerKey, node);
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Resource ask %s fits in available node resources %s, " +
"but no container was allocated",
capability, available));
}
return Resources.none();
}
@ -1096,7 +1101,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} else if (!getQueue().fitsInMaxShare(resource)) {
// The requested container must fit in queue maximum share
updateAMDiagnosticMsg(resource,
" exceeds current queue or its parents maximum resource allowed).");
" exceeds current queue or its parents maximum resource allowed). " +
"Max share of queue: " + getQueue().getMaxShare());
ret = false;
}

View File

@ -182,6 +182,9 @@ public class FSParentQueue extends FSQueue {
// If this queue is over its limit, reject
if (!assignContainerPreCheck(node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assign container precheck on node " + node + " failed");
}
return assigned;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions
.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@ -73,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils.MaxResourceValidationResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -449,10 +453,7 @@ public class FairScheduler extends
String message =
"Reject application " + applicationId + " submitted by user " + user
+ " with an empty queue name.";
LOG.info(message);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
rejectApplicationWithMessage(applicationId, message);
return;
}
@ -461,10 +462,7 @@ public class FairScheduler extends
"Reject application " + applicationId + " submitted by user " + user
+ " with an illegal queue name " + queueName + ". "
+ "The queue name cannot start/end with period.";
LOG.info(message);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
rejectApplicationWithMessage(applicationId, message);
return;
}
@ -476,6 +474,31 @@ public class FairScheduler extends
return;
}
if (rmApp != null && rmApp.getAMResourceRequests() != null) {
// Resources.fitsIn would always return false when queueMaxShare is 0
// for any resource, but only using Resources.fitsIn is not enough
// is it would return false for such cases when the requested
// resource is smaller than the max resource but that max resource is
// not zero, e.g. requested vCores = 2, max vCores = 1.
// With this check, we only reject those applications where resource
// requested is greater than 0 and we have 0
// of that resource on the queue.
List<MaxResourceValidationResult> invalidAMResourceRequests =
validateResourceRequests(rmApp.getAMResourceRequests(), queue);
if (!invalidAMResourceRequests.isEmpty()) {
String msg = String.format(
"Cannot submit application %s to queue %s because "
+ "it has zero amount of resource for a requested "
+ "resource! Invalid requested AM resources: %s, "
+ "maximum queue resources: %s",
applicationId, queue.getName(),
invalidAMResourceRequests, queue.getMaxShare());
rejectApplicationWithMessage(applicationId, msg);
return;
}
}
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(
user);
@ -485,9 +508,7 @@ public class FairScheduler extends
String msg = "User " + userUgi.getUserName()
+ " cannot submit applications to queue " + queue.getName()
+ "(requested queuename is " + queueName + ")";
LOG.info(msg);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg));
rejectApplicationWithMessage(applicationId, msg);
return;
}
@ -604,10 +625,7 @@ public class FairScheduler extends
}
if (appRejectMsg != null && rmApp != null) {
LOG.error(appRejectMsg);
rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(rmApp.getApplicationId(),
RMAppEventType.APP_REJECTED, appRejectMsg));
rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg);
return null;
}
@ -834,7 +852,6 @@ public class FairScheduler extends
List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals, ContainerUpdates updateRequests) {
// Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId);
if (application == null) {
@ -854,6 +871,24 @@ public class FairScheduler extends
return EMPTY_ALLOCATION;
}
ApplicationId applicationId = application.getApplicationId();
FSLeafQueue queue = application.getQueue();
List<MaxResourceValidationResult> invalidAsks =
validateResourceRequests(ask, queue);
// We need to be fail-fast here if any invalid ask is detected.
// If we would have thrown exception later, this could be problematic as
// tokens and promoted / demoted containers would have been lost because
// scheduler would clear them right away and AM
// would not get this information.
if (!invalidAsks.isEmpty()) {
throw new SchedulerInvalidResoureRequestException(String.format(
"Resource request is invalid for application %s because queue %s "
+ "has 0 amount of resource for a resource type! "
+ "Validation result: %s",
applicationId, queue.getName(), invalidAsks));
}
// Handle promotions and demotions
handleContainerUpdates(application, updateRequests);
@ -912,6 +947,7 @@ public class FairScheduler extends
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens(), null, null,
@ -920,6 +956,34 @@ public class FairScheduler extends
application.pullPreviousAttemptContainers());
}
private List<MaxResourceValidationResult> validateResourceRequests(
List<ResourceRequest> requests, FSLeafQueue queue) {
List<MaxResourceValidationResult> validationResults = Lists.newArrayList();
for (ResourceRequest resourceRequest : requests) {
if (LOG.isTraceEnabled()) {
LOG.trace("Validating resource request: " + resourceRequest);
}
MaxResourceValidationResult validationResult =
SchedulerUtils.validateResourceRequestsAgainstQueueMaxResource(
resourceRequest, queue.getMaxShare());
if (!validationResult.isValid()) {
validationResults.add(validationResult);
LOG.warn(String.format("Queue %s cannot handle resource request" +
"because it has zero available amount of resource " +
"for a requested resource type, " +
"so the resource request is ignored!"
+ " Requested resources: %s, " +
"maximum queue resources: %s",
queue.getName(), resourceRequest.getCapability(),
queue.getMaxShare()));
}
}
return validationResults;
}
@Override
protected void nodeUpdate(RMNode nm) {
try {
@ -1060,9 +1124,14 @@ public class FairScheduler extends
Resource assignedResource = Resources.clone(Resources.none());
Resource maxResourcesToAssign = Resources.multiply(
node.getUnallocatedResource(), 0.5f);
while (node.getReservedContainer() == null) {
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
if (assignment.equals(Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("No container is allocated on node " + node);
}
break;
}
@ -1254,9 +1323,7 @@ public class FairScheduler extends
String message = "Application " + applicationId
+ " submitted to a reservation which is not yet "
+ "currently active: " + resQName;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
rejectApplicationWithMessage(applicationId, message);
return null;
}
if (!queue.getParent().getQueueName().equals(queueName)) {
@ -1264,9 +1331,7 @@ public class FairScheduler extends
"Application: " + applicationId + " submitted to a reservation "
+ resQName + " which does not belong to the specified queue: "
+ queueName;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
message));
rejectApplicationWithMessage(applicationId, message);
return null;
}
// use the reservation queue to run the app
@ -1279,7 +1344,13 @@ public class FairScheduler extends
} finally {
readLock.unlock();
}
}
private void rejectApplicationWithMessage(ApplicationId applicationId,
String msg) {
LOG.info(msg);
rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
applicationId, RMAppEventType.APP_REJECTED, msg));
}
private String getDefaultQueueForPlanQueue(String queueName) {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -57,6 +58,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class FairSchedulerTestBase {
@ -163,37 +165,43 @@ public class FairSchedulerTestBase {
protected ApplicationAttemptId createSchedulingRequest(
int memory, int vcores, String queueId, String userId, int numContainers,
int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
this.ATTEMPT_ID++);
ResourceRequest request = createResourceRequest(memory, vcores,
ResourceRequest.ANY, priority, numContainers, true);
return createSchedulingRequest(Lists.newArrayList(request), queueId,
userId);
}
protected ApplicationAttemptId createSchedulingRequest(
Collection<ResourceRequest> requests, String queueId, String userId) {
ApplicationAttemptId id =
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
// This conditional is for testAclSubmitApplication where app is rejected
// and no app is added.
if (scheduler.getSchedulerApplications().
containsKey(id.getApplicationId())) {
if (scheduler.getSchedulerApplications()
.containsKey(id.getApplicationId())) {
scheduler.addApplicationAttempt(id, false, false);
}
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, vcores,
ResourceRequest.ANY, priority, numContainers, true);
ask.add(request);
List<ResourceRequest> ask = new ArrayList<>(requests);
RMApp rmApp = mock(RMApp.class);
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
mock(ApplicationSubmissionContext.class);
when(submissionContext.getUnmanagedAM()).thenReturn(false);
when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext);
Container container = mock(Container.class);
when(rmAppAttempt.getMasterContainer()).thenReturn(container);
resourceManager.getRMContext().getRMApps()
.put(id.getApplicationId(), rmApp);
.put(id.getApplicationId(), rmApp);
scheduler.allocate(id, ask, null, new ArrayList<ContainerId>(),
null, null, NULL_UPDATE_REQUESTS);
scheduler.allocate(id, ask, null, new ArrayList<>(),
null, null, NULL_UPDATE_REQUESTS);
scheduler.update();
return id;
}
@ -252,13 +260,36 @@ public class FairSchedulerTestBase {
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
String queue, String user, Resource amResource) {
createApplicationWithAMResourceInternal(attId, queue, user, amResource,
null);
ApplicationId appId = attId.getApplicationId();
addApplication(queue, user, appId);
addAppAttempt(attId);
}
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
String queue, String user, Resource amResource,
List<ResourceRequest> amReqs) {
createApplicationWithAMResourceInternal(attId, queue, user, amResource,
amReqs);
ApplicationId appId = attId.getApplicationId();
addApplication(queue, user, appId);
}
private void createApplicationWithAMResourceInternal(
ApplicationAttemptId attId, String queue, String user,
Resource amResource, List<ResourceRequest> amReqs) {
RMContext rmContext = resourceManager.getRMContext();
ApplicationId appId = attId.getApplicationId();
RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null,
ApplicationSubmissionContext.newInstance(appId, null, queue, null,
mock(ContainerLaunchContext.class), false, false, 0, amResource,
null), scheduler, null, 0, null, null, null);
null),
scheduler, null, 0, null, null, amReqs);
rmContext.getRMApps().put(appId, rmApp);
}
private void addApplication(String queue, String user, ApplicationId appId) {
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED);
@ -268,8 +299,11 @@ public class FairSchedulerTestBase {
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
appId, queue, user);
scheduler.handle(appAddedEvent);
}
private void addAppAttempt(ApplicationAttemptId attId) {
AppAttemptAddedSchedulerEvent attempAddedEvent =
new AppAttemptAddedSchedulerEvent(attId, false);
new AppAttemptAddedSchedulerEvent(attId, false);
scheduler.handle(attempAddedEvent);
}

View File

@ -41,9 +41,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.xml.parsers.ParserConfigurationException;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -69,6 +72,8 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions
.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
@ -5414,4 +5419,204 @@ public class TestFairScheduler extends FairSchedulerTestBase {
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.EXPIRE);
}
@Test
public void testAppRejectedToQueueWithZeroCapacityOfVcores()
throws IOException {
testAppRejectedToQueueWithZeroCapacityOfResource(
ResourceInformation.VCORES_URI);
}
@Test
public void testAppRejectedToQueueWithZeroCapacityOfMemory()
throws IOException {
testAppRejectedToQueueWithZeroCapacityOfResource(
ResourceInformation.MEMORY_URI);
}
private void testAppRejectedToQueueWithZeroCapacityOfResource(String resource)
throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
generateAllocationFileWithZeroResource(resource);
final List<Event> recordedEvents = Lists.newArrayList();
RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
when(mockDispatcher.getEventHandler()).thenReturn((EventHandler) event -> {
if (event instanceof RMAppEvent) {
recordedEvents.add(event);
}
});
Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
((AsyncDispatcher) mockDispatcher).start();
scheduler.setRMContext(spyContext);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// submit app with queue name (queueA)
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
ResourceRequest amReqs = ResourceRequest.newBuilder()
.capability(Resource.newInstance(5 * GB, 3)).build();
createApplicationWithAMResource(appAttemptId1, "queueA", "user1",
Resource.newInstance(GB, 1), Lists.newArrayList(amReqs));
scheduler.update();
assertEquals("Exactly one APP_REJECTED event is expected", 1,
recordedEvents.size());
Event event = recordedEvents.get(0);
RMAppEvent rmAppEvent = (RMAppEvent) event;
assertEquals(RMAppEventType.APP_REJECTED, rmAppEvent.getType());
assertTrue("Diagnostic message does not match: " +
rmAppEvent.getDiagnosticMsg(),
rmAppEvent.getDiagnosticMsg()
.matches("Cannot submit application application[\\d_]+ to queue "
+ "root.queueA because it has zero amount of resource "
+ "for a requested resource! " +
"Invalid requested AM resources: .+, "
+ "maximum queue resources: .+"));
}
private void generateAllocationFileWithZeroResource(String resource)
throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
String resources = "";
if (resource.equals(ResourceInformation.MEMORY_URI)) {
resources = "0 mb,2vcores";
} else if (resource.equals(ResourceInformation.VCORES_URI)) {
resources = "10000 mb,0vcores";
}
out.println("<minResources>" + resources + "</minResources>");
out.println("<maxResources>" + resources + "</maxResources>");
out.println("<weight>2.0</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>1 mb 1 vcores</minResources>");
out.println("<weight>0.0</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
}
@Test
public void testSchedulingRejectedToQueueWithZeroCapacityOfMemory()
throws IOException {
// This request is not valid as queue will have 0 capacity of memory and
// the requests asks 2048M
ResourceRequest invalidRequest =
createResourceRequest(2048, 2, ResourceRequest.ANY, 1, 2, true);
ResourceRequest validRequest =
createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true);
testSchedulingRejectedToQueueZeroCapacityOfResource(
ResourceInformation.MEMORY_URI,
Lists.newArrayList(invalidRequest, validRequest));
}
@Test
public void testSchedulingAllowedToQueueWithZeroCapacityOfMemory()
throws IOException {
testSchedulingAllowedToQueueZeroCapacityOfResource(
ResourceInformation.MEMORY_URI, 0, 2);
}
@Test
public void testSchedulingRejectedToQueueWithZeroCapacityOfVcores()
throws IOException {
// This request is not valid as queue will have 0 capacity of vCores and
// the requests asks 1
ResourceRequest invalidRequest =
createResourceRequest(0, 1, ResourceRequest.ANY, 1, 2, true);
ResourceRequest validRequest =
createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true);
testSchedulingRejectedToQueueZeroCapacityOfResource(
ResourceInformation.VCORES_URI,
Lists.newArrayList(invalidRequest, validRequest));
}
@Test
public void testSchedulingAllowedToQueueWithZeroCapacityOfVcores()
throws IOException {
testSchedulingAllowedToQueueZeroCapacityOfResource(
ResourceInformation.VCORES_URI, 2048, 0);
}
private void testSchedulingRejectedToQueueZeroCapacityOfResource(
String resource, Collection<ResourceRequest> requests)
throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
generateAllocationFileWithZeroResource(resource);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
try {
createSchedulingRequest(requests, "queueA", "user1");
fail("Exception is expected because the queue has zero capacity of "
+ resource + " and requested resource capabilities are: "
+ requests.stream().map(ResourceRequest::getCapability)
.collect(Collectors.toList()));
} catch (SchedulerInvalidResoureRequestException e) {
assertTrue(
"The thrown exception is not the expected one. Exception message: "
+ e.getMessage(),
e.getMessage()
.matches("Resource request is invalid for application "
+ "application[\\d_]+ because queue root\\.queueA has 0 "
+ "amount of resource for a resource type! "
+ "Validation result:.*"));
List<ApplicationAttemptId> appsInQueue =
scheduler.getAppsInQueue("queueA");
assertEquals("Number of apps in queue 'queueA' should be one!", 1,
appsInQueue.size());
ApplicationAttemptId appAttemptId =
scheduler.getAppsInQueue("queueA").get(0);
assertNotNull(
"Scheduler app for appAttemptId " + appAttemptId
+ " should not be null!",
scheduler.getSchedulerApp(appAttemptId));
FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId);
assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId
+ " should not be null!", schedulerApp.getAppSchedulingInfo());
assertTrue("There should be no requests accepted", schedulerApp
.getAppSchedulingInfo().getAllResourceRequests().isEmpty());
}
}
private void testSchedulingAllowedToQueueZeroCapacityOfResource(
String resource, int memory, int vCores) throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
generateAllocationFileWithZeroResource(resource);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2);
}
}