From f48fec83d0f2d1a781a141ad7216463c5526321f Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Mon, 21 May 2018 08:00:21 -0700 Subject: [PATCH] YARN-8248. Job hangs when a job requests a resource that its queue does not have. (Szilard Nemeth via Haibo Chen) --- .../scheduler/SchedulerUtils.java | 165 ++++++++++---- .../scheduler/fair/FSAppAttempt.java | 10 +- .../scheduler/fair/FSParentQueue.java | 3 + .../scheduler/fair/FairScheduler.java | 115 ++++++++-- .../scheduler/fair/FairSchedulerTestBase.java | 64 ++++-- .../scheduler/fair/TestFairScheduler.java | 205 ++++++++++++++++++ 6 files changed, 484 insertions(+), 78 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 9b3c20a0e66..7de250d31d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -18,9 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Set; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions + .SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; @@ -61,12 +68,37 @@ import org.apache.hadoop.yarn.util.resource.Resources; @Unstable public class SchedulerUtils { + /** + * This class contains invalid resource information along with its + * resource request. + */ + public static class MaxResourceValidationResult { + private ResourceRequest resourceRequest; + private List invalidResources; + + MaxResourceValidationResult(ResourceRequest resourceRequest, + List invalidResources) { + this.resourceRequest = resourceRequest; + this.invalidResources = invalidResources; + } + + public boolean isValid() { + return invalidResources.isEmpty(); + } + + @Override + public String toString() { + return "MaxResourceValidationResult{" + "resourceRequest=" + + resourceRequest + ", invalidResources=" + invalidResources + '}'; + } + } + private static final Log LOG = LogFactory.getLog(SchedulerUtils.class); - private static final RecordFactory recordFactory = + private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public static final String RELEASED_CONTAINER = + public static final String RELEASED_CONTAINER = "Container released by application"; public static final String UPDATED_CONTAINER = @@ -325,6 +357,22 @@ public class SchedulerUtils { } } + private static Map getZeroResources( + Resource resource) { + Map resourceInformations = Maps.newHashMap(); + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + + for (int i = 0; i < maxLength; i++) { + ResourceInformation resourceInformation = + resource.getResourceInformation(i); + if (resourceInformation.getValue() == 0L) { + resourceInformations.put(resourceInformation.getName(), + resourceInformation); + } + } + return resourceInformations; + } + @Private @VisibleForTesting static void checkResourceRequestAgainstAvailableResource(Resource reqResource, @@ -339,49 +387,88 @@ public class SchedulerUtils { reqResourceName); } - final ResourceInformation availableRI = - availableResource.getResourceInformation(reqResourceName); - - long requestedResourceValue = requestedRI.getValue(); - long availableResourceValue = availableRI.getValue(); - int unitsRelation = UnitsConversionUtil - .compareUnits(requestedRI.getUnits(), availableRI.getUnits()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Requested resource information: " + requestedRI); - LOG.debug("Available resource information: " + availableRI); - LOG.debug("Relation of units: " + unitsRelation); - } - - // requested resource unit is less than available resource unit - // e.g. requestedUnit: "m", availableUnit: "K") - if (unitsRelation < 0) { - availableResourceValue = - UnitsConversionUtil.convert(availableRI.getUnits(), - requestedRI.getUnits(), availableRI.getValue()); - - // requested resource unit is greater than available resource unit - // e.g. requestedUnit: "G", availableUnit: "M") - } else if (unitsRelation > 0) { - requestedResourceValue = - UnitsConversionUtil.convert(requestedRI.getUnits(), - availableRI.getUnits(), requestedRI.getValue()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Requested resource value after conversion: " + - requestedResourceValue); - LOG.info("Available resource value after conversion: " + - availableResourceValue); - } - - if (requestedResourceValue > availableResourceValue) { + boolean valid = checkResource(requestedRI, availableResource); + if (!valid) { throwInvalidResourceException(reqResource, availableResource, reqResourceName); } } } + public static MaxResourceValidationResult + validateResourceRequestsAgainstQueueMaxResource( + ResourceRequest resReq, Resource availableResource) + throws SchedulerInvalidResoureRequestException { + final Resource reqResource = resReq.getCapability(); + Map resourcesWithZeroAmount = + getZeroResources(availableResource); + + if (LOG.isTraceEnabled()) { + LOG.trace("Resources with zero amount: " + + Arrays.toString(resourcesWithZeroAmount.entrySet().toArray())); + } + + List invalidResources = Lists.newArrayList(); + for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { + final ResourceInformation requestedRI = + reqResource.getResourceInformation(i); + final String reqResourceName = requestedRI.getName(); + + if (resourcesWithZeroAmount.containsKey(reqResourceName) + && requestedRI.getValue() > 0) { + invalidResources.add(requestedRI); + } + } + return new MaxResourceValidationResult(resReq, invalidResources); + } + + /** + * Checks requested ResouceInformation against available Resource. + * @param requestedRI + * @param availableResource + * @return true if request is valid, false otherwise. + */ + private static boolean checkResource( + ResourceInformation requestedRI, Resource availableResource) { + final ResourceInformation availableRI = + availableResource.getResourceInformation(requestedRI.getName()); + + long requestedResourceValue = requestedRI.getValue(); + long availableResourceValue = availableRI.getValue(); + int unitsRelation = UnitsConversionUtil.compareUnits(requestedRI.getUnits(), + availableRI.getUnits()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Requested resource information: " + requestedRI); + LOG.debug("Available resource information: " + availableRI); + LOG.debug("Relation of units: " + unitsRelation); + } + + // requested resource unit is less than available resource unit + // e.g. requestedUnit: "m", availableUnit: "K") + if (unitsRelation < 0) { + availableResourceValue = + UnitsConversionUtil.convert(availableRI.getUnits(), + requestedRI.getUnits(), availableRI.getValue()); + + // requested resource unit is greater than available resource unit + // e.g. requestedUnit: "G", availableUnit: "M") + } else if (unitsRelation > 0) { + requestedResourceValue = + UnitsConversionUtil.convert(requestedRI.getUnits(), + availableRI.getUnits(), requestedRI.getValue()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Requested resource value after conversion: " + + requestedResourceValue); + LOG.info("Available resource value after conversion: " + + availableResourceValue); + } + + return requestedResourceValue <= availableResourceValue; + } + private static void throwInvalidResourceException(Resource reqResource, Resource availableResource, String reqResourceName) throws InvalidResourceRequestException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 0305702fb5f..281aded9e8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -459,7 +459,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Add it to allContainers list. addToNewlyAllocatedContainers(node, rmContainer); liveContainers.put(container.getId(), rmContainer); - // Update consumption and track allocations ContainerRequest containerRequest = appSchedulingInfo.allocate( type, node, schedulerKey, container); @@ -867,6 +866,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (reserved) { unreserve(schedulerKey, node); } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Resource ask %s fits in available node resources %s, " + + "but no container was allocated", + capability, available)); + } return Resources.none(); } @@ -1096,7 +1101,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } else if (!getQueue().fitsInMaxShare(resource)) { // The requested container must fit in queue maximum share updateAMDiagnosticMsg(resource, - " exceeds current queue or its parents maximum resource allowed)."); + " exceeds current queue or its parents maximum resource allowed). " + + "Max share of queue: " + getQueue().getMaxShare()); ret = false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index a8e53fc26f2..26c5630a6d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -182,6 +182,9 @@ public class FSParentQueue extends FSQueue { // If this queue is over its limit, reject if (!assignContainerPreCheck(node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Assign container precheck on node " + node + " failed"); + } return assigned; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 1f85814adac..1c4bd51473d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions + .SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -73,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils.MaxResourceValidationResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -449,10 +453,7 @@ public class FairScheduler extends String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; - LOG.info(message); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return; } @@ -461,10 +462,7 @@ public class FairScheduler extends "Reject application " + applicationId + " submitted by user " + user + " with an illegal queue name " + queueName + ". " + "The queue name cannot start/end with period."; - LOG.info(message); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return; } @@ -476,6 +474,31 @@ public class FairScheduler extends return; } + if (rmApp != null && rmApp.getAMResourceRequests() != null) { + // Resources.fitsIn would always return false when queueMaxShare is 0 + // for any resource, but only using Resources.fitsIn is not enough + // is it would return false for such cases when the requested + // resource is smaller than the max resource but that max resource is + // not zero, e.g. requested vCores = 2, max vCores = 1. + // With this check, we only reject those applications where resource + // requested is greater than 0 and we have 0 + // of that resource on the queue. + List invalidAMResourceRequests = + validateResourceRequests(rmApp.getAMResourceRequests(), queue); + + if (!invalidAMResourceRequests.isEmpty()) { + String msg = String.format( + "Cannot submit application %s to queue %s because " + + "it has zero amount of resource for a requested " + + "resource! Invalid requested AM resources: %s, " + + "maximum queue resources: %s", + applicationId, queue.getName(), + invalidAMResourceRequests, queue.getMaxShare()); + rejectApplicationWithMessage(applicationId, msg); + return; + } + } + // Enforce ACLs UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( user); @@ -485,9 +508,7 @@ public class FairScheduler extends String msg = "User " + userUgi.getUserName() + " cannot submit applications to queue " + queue.getName() + "(requested queuename is " + queueName + ")"; - LOG.info(msg); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg)); + rejectApplicationWithMessage(applicationId, msg); return; } @@ -604,10 +625,7 @@ public class FairScheduler extends } if (appRejectMsg != null && rmApp != null) { - LOG.error(appRejectMsg); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(rmApp.getApplicationId(), - RMAppEventType.APP_REJECTED, appRejectMsg)); + rejectApplicationWithMessage(rmApp.getApplicationId(), appRejectMsg); return null; } @@ -834,7 +852,6 @@ public class FairScheduler extends List ask, List schedulingRequests, List release, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { - // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { @@ -854,6 +871,24 @@ public class FairScheduler extends return EMPTY_ALLOCATION; } + ApplicationId applicationId = application.getApplicationId(); + FSLeafQueue queue = application.getQueue(); + List invalidAsks = + validateResourceRequests(ask, queue); + + // We need to be fail-fast here if any invalid ask is detected. + // If we would have thrown exception later, this could be problematic as + // tokens and promoted / demoted containers would have been lost because + // scheduler would clear them right away and AM + // would not get this information. + if (!invalidAsks.isEmpty()) { + throw new SchedulerInvalidResoureRequestException(String.format( + "Resource request is invalid for application %s because queue %s " + + "has 0 amount of resource for a resource type! " + + "Validation result: %s", + applicationId, queue.getName(), invalidAsks)); + } + // Handle promotions and demotions handleContainerUpdates(application, updateRequests); @@ -912,6 +947,7 @@ public class FairScheduler extends Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); + return new Allocation(newlyAllocatedContainers, headroom, preemptionContainerIds, null, null, application.pullUpdatedNMTokens(), null, null, @@ -920,6 +956,34 @@ public class FairScheduler extends application.pullPreviousAttemptContainers()); } + private List validateResourceRequests( + List requests, FSLeafQueue queue) { + List validationResults = Lists.newArrayList(); + + for (ResourceRequest resourceRequest : requests) { + if (LOG.isTraceEnabled()) { + LOG.trace("Validating resource request: " + resourceRequest); + } + + MaxResourceValidationResult validationResult = + SchedulerUtils.validateResourceRequestsAgainstQueueMaxResource( + resourceRequest, queue.getMaxShare()); + if (!validationResult.isValid()) { + validationResults.add(validationResult); + LOG.warn(String.format("Queue %s cannot handle resource request" + + "because it has zero available amount of resource " + + "for a requested resource type, " + + "so the resource request is ignored!" + + " Requested resources: %s, " + + "maximum queue resources: %s", + queue.getName(), resourceRequest.getCapability(), + queue.getMaxShare())); + } + } + + return validationResults; + } + @Override protected void nodeUpdate(RMNode nm) { try { @@ -1060,9 +1124,14 @@ public class FairScheduler extends Resource assignedResource = Resources.clone(Resources.none()); Resource maxResourcesToAssign = Resources.multiply( node.getUnallocatedResource(), 0.5f); + while (node.getReservedContainer() == null) { Resource assignment = queueMgr.getRootQueue().assignContainer(node); + if (assignment.equals(Resources.none())) { + if (LOG.isDebugEnabled()) { + LOG.debug("No container is allocated on node " + node); + } break; } @@ -1254,9 +1323,7 @@ public class FairScheduler extends String message = "Application " + applicationId + " submitted to a reservation which is not yet " + "currently active: " + resQName; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return null; } if (!queue.getParent().getQueueName().equals(queueName)) { @@ -1264,9 +1331,7 @@ public class FairScheduler extends "Application: " + applicationId + " submitted to a reservation " + resQName + " which does not belong to the specified queue: " + queueName; - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); + rejectApplicationWithMessage(applicationId, message); return null; } // use the reservation queue to run the app @@ -1279,7 +1344,13 @@ public class FairScheduler extends } finally { readLock.unlock(); } + } + private void rejectApplicationWithMessage(ApplicationId applicationId, + String msg) { + LOG.info(msg); + rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( + applicationId, RMAppEventType.APP_REJECTED, msg)); } private String getDefaultQueueForPlanQueue(String queueName) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index b99856467cf..3ac3849cf73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -57,6 +58,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; import java.util.ArrayList; +import java.util.Collection; import java.util.List; public class FairSchedulerTestBase { @@ -163,37 +165,43 @@ public class FairSchedulerTestBase { protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { - ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, - this.ATTEMPT_ID++); + ResourceRequest request = createResourceRequest(memory, vcores, + ResourceRequest.ANY, priority, numContainers, true); + return createSchedulingRequest(Lists.newArrayList(request), queueId, + userId); + } + + protected ApplicationAttemptId createSchedulingRequest( + Collection requests, String queueId, String userId) { + ApplicationAttemptId id = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. - if (scheduler.getSchedulerApplications(). - containsKey(id.getApplicationId())) { + if (scheduler.getSchedulerApplications() + .containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } - List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, - ResourceRequest.ANY, priority, numContainers, true); - ask.add(request); + + List ask = new ArrayList<>(requests); RMApp rmApp = mock(RMApp.class); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( - new RMAppAttemptMetrics(id, resourceManager.getRMContext())); + new RMAppAttemptMetrics(id, resourceManager.getRMContext())); ApplicationSubmissionContext submissionContext = - mock(ApplicationSubmissionContext.class); + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); Container container = mock(Container.class); when(rmAppAttempt.getMasterContainer()).thenReturn(container); resourceManager.getRMContext().getRMApps() - .put(id.getApplicationId(), rmApp); + .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, null, new ArrayList(), - null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(id, ask, null, new ArrayList<>(), + null, null, NULL_UPDATE_REQUESTS); scheduler.update(); return id; } @@ -252,13 +260,36 @@ public class FairSchedulerTestBase { protected void createApplicationWithAMResource(ApplicationAttemptId attId, String queue, String user, Resource amResource) { + createApplicationWithAMResourceInternal(attId, queue, user, amResource, + null); + ApplicationId appId = attId.getApplicationId(); + addApplication(queue, user, appId); + addAppAttempt(attId); + } + + protected void createApplicationWithAMResource(ApplicationAttemptId attId, + String queue, String user, Resource amResource, + List amReqs) { + createApplicationWithAMResourceInternal(attId, queue, user, amResource, + amReqs); + ApplicationId appId = attId.getApplicationId(); + addApplication(queue, user, appId); + } + + private void createApplicationWithAMResourceInternal( + ApplicationAttemptId attId, String queue, String user, + Resource amResource, List amReqs) { RMContext rmContext = resourceManager.getRMContext(); ApplicationId appId = attId.getApplicationId(); RMApp rmApp = new RMAppImpl(appId, rmContext, conf, null, user, null, ApplicationSubmissionContext.newInstance(appId, null, queue, null, mock(ContainerLaunchContext.class), false, false, 0, amResource, - null), scheduler, null, 0, null, null, null); + null), + scheduler, null, 0, null, null, amReqs); rmContext.getRMApps().put(appId, rmApp); + } + + private void addApplication(String queue, String user, ApplicationId appId) { RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START); resourceManager.getRMContext().getRMApps().get(appId).handle(event); event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED); @@ -268,8 +299,11 @@ public class FairSchedulerTestBase { AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( appId, queue, user); scheduler.handle(appAddedEvent); + } + + private void addAppAttempt(ApplicationAttemptId attId) { AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(attId, false); + new AppAttemptAddedSchedulerEvent(attId, false); scheduler.handle(attempAddedEvent); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index d9c06a79db2..2f6c2cf2595 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -41,9 +41,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.xml.parsers.ParserConfigurationException; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol; @@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -69,6 +72,8 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions + .SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; @@ -5414,4 +5419,204 @@ public class TestFairScheduler extends FairSchedulerTestBase { SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.EXPIRE); } + + @Test + public void testAppRejectedToQueueWithZeroCapacityOfVcores() + throws IOException { + testAppRejectedToQueueWithZeroCapacityOfResource( + ResourceInformation.VCORES_URI); + } + + @Test + public void testAppRejectedToQueueWithZeroCapacityOfMemory() + throws IOException { + testAppRejectedToQueueWithZeroCapacityOfResource( + ResourceInformation.MEMORY_URI); + } + + private void testAppRejectedToQueueWithZeroCapacityOfResource(String resource) + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFileWithZeroResource(resource); + + final List recordedEvents = Lists.newArrayList(); + + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn((EventHandler) event -> { + if (event instanceof RMAppEvent) { + recordedEvents.add(event); + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((AsyncDispatcher) mockDispatcher).start(); + + scheduler.setRMContext(spyContext); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // submit app with queue name (queueA) + ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1); + + ResourceRequest amReqs = ResourceRequest.newBuilder() + .capability(Resource.newInstance(5 * GB, 3)).build(); + createApplicationWithAMResource(appAttemptId1, "queueA", "user1", + Resource.newInstance(GB, 1), Lists.newArrayList(amReqs)); + scheduler.update(); + + assertEquals("Exactly one APP_REJECTED event is expected", 1, + recordedEvents.size()); + Event event = recordedEvents.get(0); + RMAppEvent rmAppEvent = (RMAppEvent) event; + assertEquals(RMAppEventType.APP_REJECTED, rmAppEvent.getType()); + assertTrue("Diagnostic message does not match: " + + rmAppEvent.getDiagnosticMsg(), + rmAppEvent.getDiagnosticMsg() + .matches("Cannot submit application application[\\d_]+ to queue " + + "root.queueA because it has zero amount of resource " + + "for a requested resource! " + + "Invalid requested AM resources: .+, " + + "maximum queue resources: .+")); + } + + private void generateAllocationFileWithZeroResource(String resource) + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + + String resources = ""; + if (resource.equals(ResourceInformation.MEMORY_URI)) { + resources = "0 mb,2vcores"; + } else if (resource.equals(ResourceInformation.VCORES_URI)) { + resources = "10000 mb,0vcores"; + } + out.println("" + resources + ""); + out.println("" + resources + ""); + out.println("2.0"); + out.println(""); + out.println(""); + out.println("1 mb 1 vcores"); + out.println("0.0"); + out.println(""); + out.println(""); + out.close(); + } + + @Test + public void testSchedulingRejectedToQueueWithZeroCapacityOfMemory() + throws IOException { + // This request is not valid as queue will have 0 capacity of memory and + // the requests asks 2048M + ResourceRequest invalidRequest = + createResourceRequest(2048, 2, ResourceRequest.ANY, 1, 2, true); + + ResourceRequest validRequest = + createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true); + testSchedulingRejectedToQueueZeroCapacityOfResource( + ResourceInformation.MEMORY_URI, + Lists.newArrayList(invalidRequest, validRequest)); + } + + @Test + public void testSchedulingAllowedToQueueWithZeroCapacityOfMemory() + throws IOException { + testSchedulingAllowedToQueueZeroCapacityOfResource( + ResourceInformation.MEMORY_URI, 0, 2); + } + + @Test + public void testSchedulingRejectedToQueueWithZeroCapacityOfVcores() + throws IOException { + // This request is not valid as queue will have 0 capacity of vCores and + // the requests asks 1 + ResourceRequest invalidRequest = + createResourceRequest(0, 1, ResourceRequest.ANY, 1, 2, true); + + ResourceRequest validRequest = + createResourceRequest(0, 0, ResourceRequest.ANY, 1, 2, true); + + testSchedulingRejectedToQueueZeroCapacityOfResource( + ResourceInformation.VCORES_URI, + Lists.newArrayList(invalidRequest, validRequest)); + } + + @Test + public void testSchedulingAllowedToQueueWithZeroCapacityOfVcores() + throws IOException { + testSchedulingAllowedToQueueZeroCapacityOfResource( + ResourceInformation.VCORES_URI, 2048, 0); + } + + private void testSchedulingRejectedToQueueZeroCapacityOfResource( + String resource, Collection requests) + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFileWithZeroResource(resource); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + try { + createSchedulingRequest(requests, "queueA", "user1"); + fail("Exception is expected because the queue has zero capacity of " + + resource + " and requested resource capabilities are: " + + requests.stream().map(ResourceRequest::getCapability) + .collect(Collectors.toList())); + } catch (SchedulerInvalidResoureRequestException e) { + assertTrue( + "The thrown exception is not the expected one. Exception message: " + + e.getMessage(), + e.getMessage() + .matches("Resource request is invalid for application " + + "application[\\d_]+ because queue root\\.queueA has 0 " + + "amount of resource for a resource type! " + + "Validation result:.*")); + + List appsInQueue = + scheduler.getAppsInQueue("queueA"); + assertEquals("Number of apps in queue 'queueA' should be one!", 1, + appsInQueue.size()); + + ApplicationAttemptId appAttemptId = + scheduler.getAppsInQueue("queueA").get(0); + assertNotNull( + "Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", + scheduler.getSchedulerApp(appAttemptId)); + + FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + } + } + + private void testSchedulingAllowedToQueueZeroCapacityOfResource( + String resource, int memory, int vCores) throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFileWithZeroResource(resource); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2); + } }