From af286319fd9bf445a4c4376133bbb1764f3d19fd Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 25 Apr 2018 22:10:18 -0700 Subject: [PATCH] YARN-8193. YARN RM hangs abruptly (stops allocating resources) when running successive applications. (Zian Chen via wangda) Change-Id: Ia83dd2499ee9000b9e09ae5a932f21a13c0ddee6 (cherry picked from commit af986b442b7be0f2fad1241ca9e267a15c20bf43) --- .../allocator/RegularContainerAllocator.java | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index afa468bca8d..99deb1abf3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -179,11 +179,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes int missedNonPartitionedRequestSchedulingOpportunity = 0; + AppPlacementAllocator appPlacementAllocator = + appInfo.getAppPlacementAllocator(schedulerKey); + if (null == appPlacementAllocator){ + // This is possible when #pending resource decreased by a different + // thread. + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST); + return ContainerAllocation.PRIORITY_SKIPPED; + } + String requestPartition = + appPlacementAllocator.getPrimaryRequestedNodePartition(); + // Only do this when request associated with given scheduler key accepts // NO_LABEL under RESPECT_EXCLUSIVITY mode - if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, - appInfo.getAppPlacementAllocator(schedulerKey) - .getPrimaryRequestedNodePartition())) { + if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, requestPartition)) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( schedulerKey); @@ -261,12 +272,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return result; } - public float getLocalityWaitFactor( - SchedulerRequestKey schedulerKey, int clusterNodes) { + public float getLocalityWaitFactor(int uniqAsks, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = Math.max( - application.getAppPlacementAllocator(schedulerKey) - .getUniqueLocationAsks() - 1, 0); + int requiredResources = Math.max(uniqAsks - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -296,10 +304,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (rmContext.getScheduler().getNumClusterNodes() == 0) { return false; } + + int uniqLocationAsks = 0; + AppPlacementAllocator appPlacementAllocator = + application.getAppPlacementAllocator(schedulerKey); + if (appPlacementAllocator != null) { + uniqLocationAsks = appPlacementAllocator.getUniqueLocationAsks(); + } // If we have only ANY requests for this schedulerKey, we should not // delay its scheduling. - if (application.getAppPlacementAllocator(schedulerKey) - .getUniqueLocationAsks() == 1) { + if (uniqLocationAsks == 1) { return true; } @@ -313,7 +327,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } else { long requiredContainers = application.getOutstandingAsksCount(schedulerKey); - float localityWaitFactor = getLocalityWaitFactor(schedulerKey, + float localityWaitFactor = getLocalityWaitFactor(uniqLocationAsks, rmContext.getScheduler().getNumClusterNodes()); // Cap the delay by the number of nodes in the cluster. return (Math.min(rmContext.getScheduler().getNumClusterNodes(), @@ -806,6 +820,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { application.getAppSchedulingInfo().getAppPlacementAllocator( schedulerKey); + // This could be null when #pending request decreased by another thread. + if (schedulingPS == null) { + return new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + } + result = ContainerAllocation.PRIORITY_SKIPPED; Iterator iter = schedulingPS.getPreferredNodeIterator(