diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 228c6af3b4c..2339c79f62b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -589,7 +590,8 @@ public class YARNRunner implements ClientProtocol { amResourceRequest.setCapability(capability); amResourceRequest.setNumContainers(1); amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim()); - appContext.setAMContainerResourceRequest(amResourceRequest); + appContext.setAMContainerResourceRequests( + Collections.singletonList(amResourceRequest)); } // set labels for the Job containers appContext.setNodeLabelExpression(jobConf diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index c6da9a32201..cb355b5cadd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -571,7 +571,7 @@ public class TestYARNRunner { buildSubmitContext(yarnRunner, jobConf); assertEquals(appSubCtx.getNodeLabelExpression(), "GPU"); - assertEquals(appSubCtx.getAMContainerResourceRequest() + assertEquals(appSubCtx.getAMContainerResourceRequests().get(0) .getNodeLabelExpression(), "highMem"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index e562aaae5c3..4f1d147791c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.api.records; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; @@ -100,7 +102,7 @@ public abstract class ApplicationSubmissionContext { amReq.setNumContainers(1); amReq.setRelaxLocality(true); amReq.setNodeLabelExpression(amContainerLabelExpression); - context.setAMContainerResourceRequest(amReq); + context.setAMContainerResourceRequests(Collections.singletonList(amReq)); return context; } @@ -159,7 +161,8 @@ public abstract class ApplicationSubmissionContext { context.setApplicationType(applicationType); context.setKeepContainersAcrossApplicationAttempts(keepContainers); context.setNodeLabelExpression(appLabelExpression); - context.setAMContainerResourceRequest(resourceRequest); + context.setAMContainerResourceRequests( + Collections.singletonList(resourceRequest)); return context; } @@ -454,29 +457,61 @@ public abstract class ApplicationSubmissionContext { public abstract void setNodeLabelExpression(String nodeLabelExpression); /** - * Get ResourceRequest of AM container, if this is not null, scheduler will - * use this to acquire resource for AM container. - * + * Get the ResourceRequest of the AM container. + * + * If this is not null, scheduler will use this to acquire resource for AM + * container. + * * If this is null, scheduler will assemble a ResourceRequest by using * getResource and getPriority of * ApplicationSubmissionContext. - * - * Number of containers and Priority will be ignore. - * - * @return ResourceRequest of AM container + * + * Number of containers and Priority will be ignored. + * + * @return ResourceRequest of the AM container + * @deprecated See {@link #getAMContainerResourceRequests()} */ @Public @Evolving + @Deprecated public abstract ResourceRequest getAMContainerResourceRequest(); /** - * Set ResourceRequest of AM container - * @param request of AM container + * Set ResourceRequest of the AM container + * @param request of the AM container + * @deprecated See {@link #setAMContainerResourceRequests(List)} */ @Public @Evolving + @Deprecated public abstract void setAMContainerResourceRequest(ResourceRequest request); + /** + * Get the ResourceRequests of the AM container. + * + * If this is not null, scheduler will use this to acquire resource for AM + * container. + * + * If this is null, scheduler will use the ResourceRequest as determined by + * getAMContainerResourceRequest and its behavior. + * + * Number of containers and Priority will be ignored. + * + * @return List of ResourceRequests of the AM container + */ + @Public + @Evolving + public abstract List getAMContainerResourceRequests(); + + /** + * Set ResourceRequests of the AM container. + * @param requests of the AM container + */ + @Public + @Evolving + public abstract void setAMContainerResourceRequests( + List requests); + /** * Get the attemptFailuresValidityInterval in milliseconds for the application * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index fbb2089237d..ff30c37f854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -383,7 +383,7 @@ message ApplicationSubmissionContextProto { optional LogAggregationContextProto log_aggregation_context = 14; optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; - optional ResourceRequestProto am_container_resource_request = 17; + repeated ResourceRequestProto am_container_resource_request = 17; repeated ApplicationTimeoutMapProto application_timeouts = 18; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 62b54e7ed8e..1a6719ad095 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.api.records.impl.pb; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -66,7 +68,7 @@ extends ApplicationSubmissionContext { private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set applicationTags = null; - private ResourceRequest amResourceRequest = null; + private List amResourceRequests = null; private LogAggregationContext logAggregationContext = null; private ReservationId reservationId = null; private Map applicationTimeouts = null; @@ -127,9 +129,10 @@ extends ApplicationSubmissionContext { builder.clearApplicationTags(); builder.addAllApplicationTags(this.applicationTags); } - if (this.amResourceRequest != null) { - builder.setAmContainerResourceRequest( - convertToProtoFormat(this.amResourceRequest)); + if (this.amResourceRequests != null) { + builder.clearAmContainerResourceRequest(); + builder.addAllAmContainerResourceRequest( + convertToProtoFormat(this.amResourceRequests)); } if (this.logAggregationContext != null) { builder.setLogAggregationContext( @@ -430,13 +433,23 @@ extends ApplicationSubmissionContext { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } - - private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) { - return new ResourceRequestPBImpl(p); + + private List convertFromProtoFormat( + List ps) { + List rs = new ArrayList<>(); + for (ResourceRequestProto p : ps) { + rs.add(new ResourceRequestPBImpl(p)); + } + return rs; } - private ResourceRequestProto convertToProtoFormat(ResourceRequest t) { - return ((ResourceRequestPBImpl)t).getProto(); + private List convertToProtoFormat( + List ts) { + List rs = new ArrayList<>(ts.size()); + for (ResourceRequest t : ts) { + rs.add(((ResourceRequestPBImpl)t).getProto()); + } + return rs; } private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { @@ -485,25 +498,46 @@ extends ApplicationSubmissionContext { } @Override + @Deprecated public ResourceRequest getAMContainerResourceRequest() { - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - if (this.amResourceRequest != null) { - return amResourceRequest; - } // Else via proto - if (!p.hasAmContainerResourceRequest()) { + List reqs = getAMContainerResourceRequests(); + if (reqs == null || reqs.isEmpty()) { return null; } - amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest()); - return amResourceRequest; + return getAMContainerResourceRequests().get(0); } @Override + public List getAMContainerResourceRequests() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.amResourceRequests != null) { + return amResourceRequests; + } // Else via proto + if (p.getAmContainerResourceRequestCount() == 0) { + return null; + } + amResourceRequests = + convertFromProtoFormat(p.getAmContainerResourceRequestList()); + return amResourceRequests; + } + + @Override + @Deprecated public void setAMContainerResourceRequest(ResourceRequest request) { maybeInitBuilder(); if (request == null) { builder.clearAmContainerResourceRequest(); } - this.amResourceRequest = request; + this.amResourceRequests = Collections.singletonList(request); + } + + @Override + public void setAMContainerResourceRequests(List requests) { + maybeInitBuilder(); + if (requests == null) { + builder.clearAmContainerResourceRequest(); + } + this.amResourceRequests = requests; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index f3f4ba0ff90..60ade2dbcaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -801,6 +801,28 @@ public class CommonNodeLabelsManager extends AbstractService { } } + /** + * Get nodes that have no labels. + * + * @return set of nodes with no labels + */ + public Set getNodesWithoutALabel() { + try { + readLock.lock(); + Set nodes = new HashSet<>(); + for (Host host : nodeCollections.values()) { + for (NodeId nodeId : host.nms.keySet()) { + if (getLabelsByNode(nodeId).isEmpty()) { + nodes.add(nodeId); + } + } + } + return Collections.unmodifiableSet(nodes); + } finally { + readLock.unlock(); + } + } + /** * Get mapping of labels to nodes for all the labels. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7c8408eae5a..0e6aa391109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.Collections; import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -31,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -335,14 +339,16 @@ public class RMAppManager implements EventHandler, // has been disabled. Reject the recovery of this application if it // is true and give clear message so that user can react properly. if (!appContext.getUnmanagedAM() && - application.getAMResourceRequest() == null && + (application.getAMResourceRequests() == null || + application.getAMResourceRequests().isEmpty()) && !YarnConfiguration.areNodeLabelsEnabled(this.conf)) { // check application submission context and see if am resource request // or application itself contains any node label expression. - ResourceRequest amReqFromAppContext = - appContext.getAMContainerResourceRequest(); - String labelExp = (amReqFromAppContext != null) ? - amReqFromAppContext.getNodeLabelExpression() : null; + List amReqsFromAppContext = + appContext.getAMContainerResourceRequests(); + String labelExp = + (amReqsFromAppContext != null && !amReqsFromAppContext.isEmpty()) ? + amReqsFromAppContext.get(0).getNodeLabelExpression() : null; if (labelExp == null) { labelExp = appContext.getNodeLabelExpression(); } @@ -377,9 +383,9 @@ public class RMAppManager implements EventHandler, } ApplicationId applicationId = submissionContext.getApplicationId(); - ResourceRequest amReq = null; + List amReqs = null; try { - amReq = validateAndCreateResourceRequest(submissionContext, isRecovery); + amReqs = validateAndCreateResourceRequest(submissionContext, isRecovery); } catch (InvalidLabelResourceRequestException e) { // This can happen if the application had been submitted and run // with Node Label enabled but recover with Node Label disabled. @@ -440,7 +446,7 @@ public class RMAppManager implements EventHandler, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReq); + submissionContext.getApplicationTags(), amReqs); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not @@ -462,7 +468,7 @@ public class RMAppManager implements EventHandler, return application; } - private ResourceRequest validateAndCreateResourceRequest( + private List validateAndCreateResourceRequest( ApplicationSubmissionContext submissionContext, boolean isRecovery) throws InvalidResourceRequestException { // Validation of the ApplicationSubmissionContext needs to be completed @@ -472,33 +478,77 @@ public class RMAppManager implements EventHandler, // Check whether AM resource requirements are within required limits if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = submissionContext.getAMContainerResourceRequest(); - if (amReq == null) { - amReq = BuilderUtils - .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, - ResourceRequest.ANY, submissionContext.getResource(), 1); - } - - // set label expression for AM container - if (null == amReq.getNodeLabelExpression()) { - amReq.setNodeLabelExpression(submissionContext - .getNodeLabelExpression()); + List amReqs = + submissionContext.getAMContainerResourceRequests(); + if (amReqs == null || amReqs.isEmpty()) { + if (submissionContext.getResource() != null) { + amReqs = Collections.singletonList(BuilderUtils + .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, + ResourceRequest.ANY, submissionContext.getResource(), 1)); + } else { + throw new InvalidResourceRequestException("Invalid resource request, " + + "no resources requested"); + } } try { - SchedulerUtils.normalizeAndValidateRequest(amReq, - scheduler.getMaximumResourceCapability(), - submissionContext.getQueue(), scheduler, isRecovery, rmContext); + // Find the ANY request and ensure there's only one + ResourceRequest anyReq = null; + for (ResourceRequest amReq : amReqs) { + if (amReq.getResourceName().equals(ResourceRequest.ANY)) { + if (anyReq == null) { + anyReq = amReq; + } else { + throw new InvalidResourceRequestException("Invalid resource " + + "request, only one resource request with " + + ResourceRequest.ANY + " is allowed"); + } + } + } + if (anyReq == null) { + throw new InvalidResourceRequestException("Invalid resource request, " + + "no resource request specified with " + ResourceRequest.ANY); + } + + // Make sure that all of the requests agree with the ANY request + // and have correct values + for (ResourceRequest amReq : amReqs) { + amReq.setCapability(anyReq.getCapability()); + amReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)); + amReq.setNumContainers(1); + amReq.setPriority(RMAppAttemptImpl.AM_CONTAINER_PRIORITY); + } + + // set label expression for AM ANY request if not set + if (null == anyReq.getNodeLabelExpression()) { + anyReq.setNodeLabelExpression(submissionContext + .getNodeLabelExpression()); + } + + // Put ANY request at the front + if (!amReqs.get(0).equals(anyReq)) { + amReqs.remove(anyReq); + amReqs.add(0, anyReq); + } + + // Normalize all requests + for (ResourceRequest amReq : amReqs) { + SchedulerUtils.normalizeAndValidateRequest(amReq, + scheduler.getMaximumResourceCapability(), + submissionContext.getQueue(), scheduler, isRecovery, rmContext); + + amReq.setCapability( + scheduler.getNormalizedResource(amReq.getCapability())); + } + return amReqs; } catch (InvalidResourceRequestException e) { LOG.warn("RM app submission failed in validating AM resource request" + " for application " + submissionContext.getApplicationId(), e); throw e; } - - amReq.setCapability(scheduler.getNormalizedResource(amReq.getCapability())); - return amReq; } - + return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 49aa3b06dde..92c23135dd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -21,13 +21,16 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; @@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -564,19 +568,65 @@ public class RMServerUtils { * * @param rmContext context * @param conf configuration - * @param amreq am resource request + * @param amReqs am resource requests * @return applicable node count */ public static int getApplicableNodeCountForAM(RMContext rmContext, - Configuration conf, ResourceRequest amreq) { - if (YarnConfiguration.areNodeLabelsEnabled(conf)) { - RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager(); - String amNodeLabelExpression = amreq.getNodeLabelExpression(); - amNodeLabelExpression = (amNodeLabelExpression == null - || amNodeLabelExpression.trim().isEmpty()) - ? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression; - return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression); + Configuration conf, List amReqs) { + // Determine the list of nodes that are eligible based on the strict + // resource requests + Set nodesForReqs = new HashSet<>(); + for (ResourceRequest amReq : amReqs) { + if (amReq.getRelaxLocality() && + !amReq.getResourceName().equals(ResourceRequest.ANY)) { + nodesForReqs.addAll( + rmContext.getScheduler().getNodeIds(amReq.getResourceName())); + } + } + + if (YarnConfiguration.areNodeLabelsEnabled(conf)) { + // Determine the list of nodes that are eligible based on the node label + String amNodeLabelExpression = amReqs.get(0).getNodeLabelExpression(); + Set nodesForLabels = + getNodeIdsForLabel(rmContext, amNodeLabelExpression); + if (nodesForLabels != null && !nodesForLabels.isEmpty()) { + // If only node labels, strip out any wildcard NodeIds and return + if (nodesForReqs.isEmpty()) { + for (Iterator it = nodesForLabels.iterator(); it.hasNext();) { + if (it.next().getPort() == 0) { + it.remove(); + } + } + return nodesForLabels.size(); + } else { + // The NodeIds common to both the strict resource requests and the + // node label is the eligible set + return Sets.intersection(nodesForReqs, nodesForLabels).size(); + } + } + } + + // If no strict resource request NodeIds nor node label NodeIds, then just + // return the entire cluster + if (nodesForReqs.isEmpty()) { + return rmContext.getScheduler().getNumClusterNodes(); + } + // No node label NodeIds, so return the strict resource request NodeIds + return nodesForReqs.size(); + } + + private static Set getNodeIdsForLabel(RMContext rmContext, + String label) { + label = (label == null || label.trim().isEmpty()) + ? RMNodeLabelsManager.NO_LABEL : label; + if (label.equals(RMNodeLabelsManager.NO_LABEL)) { + // NO_LABEL nodes aren't tracked directly + return rmContext.getNodeLabelManager().getNodesWithoutALabel(); + } else { + Map> labelsToNodes = + rmContext.getNodeLabelManager().getLabelsToNodes( + Collections.singleton(label)); + return labelsToNodes.get(label); } - return rmContext.getScheduler().getNumClusterNodes(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index e55649d60a9..a42b3bfc0c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; @@ -246,7 +247,7 @@ public interface RMApp extends EventHandler { ReservationId getReservationId(); - ResourceRequest getAMResourceRequest(); + List getAMResourceRequests(); Map getLogAggregationReportsForApp(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index bda87c4d6f2..41f6e046588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -194,7 +194,7 @@ public class RMAppImpl implements RMApp, Recoverable { private RMAppEvent eventCausingFinalSaving; private RMAppState targetedFinalState; private RMAppState recoveredFinalState; - private ResourceRequest amReq; + private List amReqs; private CallerContext callerContext; @@ -405,8 +405,8 @@ public class RMAppImpl implements RMApp, Recoverable { Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, - String applicationType, Set applicationTags, - ResourceRequest amReq) { + String applicationType, Set applicationTags, + List amReqs) { this.systemClock = SystemClock.getInstance(); @@ -425,7 +425,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.startTime = this.systemClock.getTime(); this.applicationType = applicationType; this.applicationTags = applicationTags; - this.amReq = amReq; + this.amReqs = amReqs; if (submissionContext.getPriority() != null) { this.applicationPriority = Priority .newInstance(submissionContext.getPriority().getPriority()); @@ -919,7 +919,7 @@ public class RMAppImpl implements RMApp, Recoverable { if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) { currentAMBlacklistManager = new SimpleBlacklistManager( RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, - getAMResourceRequest()), + getAMResourceRequests()), blacklistDisableThreshold); } else { currentAMBlacklistManager = new DisabledBlacklistManager(); @@ -927,7 +927,7 @@ public class RMAppImpl implements RMApp, Recoverable { } RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, amReq, this, currentAMBlacklistManager); + submissionContext, conf, amReqs, this, currentAMBlacklistManager); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } @@ -1605,8 +1605,8 @@ public class RMAppImpl implements RMApp, Recoverable { } @Override - public ResourceRequest getAMResourceRequest() { - return this.amReq; + public List getAMResourceRequests() { + return this.amReqs; } @Override @@ -1879,7 +1879,9 @@ public class RMAppImpl implements RMApp, Recoverable { public String getAmNodeLabelExpression() { String amNodeLabelExpression = null; if (!getApplicationSubmissionContext().getUnmanagedAM()) { - amNodeLabelExpression = getAMResourceRequest().getNodeLabelExpression(); + amNodeLabelExpression = + getAMResourceRequests() != null && !getAMResourceRequests().isEmpty() + ? getAMResourceRequests().get(0).getNodeLabelExpression() : null; amNodeLabelExpression = (amNodeLabelExpression == null) ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression; amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 6158f8860e8..1f182a3abdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -192,7 +192,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private Object transitionTodo; private RMAppAttemptMetrics attemptMetrics = null; - private ResourceRequest amReq = null; + private List amReqs = null; private BlacklistManager blacklistedNodesForAM = null; private String amLaunchDiagnostics; @@ -485,16 +485,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, ResourceRequest amReq, RMApp rmApp) { + Configuration conf, List amReqs, RMApp rmApp) { this(appAttemptId, rmContext, scheduler, masterService, submissionContext, - conf, amReq, rmApp, new DisabledBlacklistManager()); + conf, amReqs, rmApp, new DisabledBlacklistManager()); } public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, ResourceRequest amReq, RMApp rmApp, + Configuration conf, List amReqs, RMApp rmApp, BlacklistManager amBlacklistManager) { this.conf = conf; this.applicationAttemptId = appAttemptId; @@ -514,7 +514,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId, rmContext); - this.amReq = amReq; + this.amReqs = amReqs; this.blacklistedNodesForAM = amBlacklistManager; final int diagnosticsLimitKC = getDiagnosticsLimitKCOrThrow(conf); @@ -1092,18 +1092,21 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // will be passed to scheduler, and scheduler will deduct the number after // AM container allocated - // Currently, following fields are all hard code, + // Currently, following fields are all hard coded, // TODO: change these fields when we want to support - // priority/resource-name/relax-locality specification for AM containers - // allocation. - appAttempt.amReq.setNumContainers(1); - appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); - appAttempt.amReq.setResourceName(ResourceRequest.ANY); - appAttempt.amReq.setRelaxLocality(true); + // priority or multiple containers AM container allocation. + for (ResourceRequest amReq : appAttempt.amReqs) { + amReq.setNumContainers(1); + amReq.setPriority(AM_CONTAINER_PRIORITY); + } - appAttempt.getAMBlacklistManager().refreshNodeHostCount( + int numNodes = RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext, - appAttempt.conf, appAttempt.amReq)); + appAttempt.conf, appAttempt.amReqs); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting node count for blacklist to " + numNodes); + } + appAttempt.getAMBlacklistManager().refreshNodeHostCount(numNodes); ResourceBlacklistRequest amBlacklist = appAttempt.getAMBlacklistManager().getBlacklistUpdates(); @@ -1116,7 +1119,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, - Collections.singletonList(appAttempt.amReq), + appAttempt.amReqs, EMPTY_CONTAINER_RELEASE_LIST, amBlacklist.getBlacklistAdditions(), amBlacklist.getBlacklistRemovals(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ce6d2a2fc5d..1d6ce6c88fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1162,4 +1162,9 @@ public abstract class AbstractYarnScheduler return SchedulerUtils.createOpportunisticRmContainer( rmContext, demotedContainer, false); } + + @Override + public List getNodeIds(String resourceName) { + return nodeTracker.getNodeIdsByResourceName(resourceName); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index e487f698d09..010e64506b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -268,6 +268,9 @@ public class ClusterNodeTracker { /** * Convenience method to filter nodes based on a condition. + * + * @param nodeFilter A {@link NodeFilter} for filtering the nodes + * @return A list of filtered nodes */ public List getNodes(NodeFilter nodeFilter) { List nodeList = new ArrayList<>(); @@ -288,6 +291,37 @@ public class ClusterNodeTracker { return nodeList; } + public List getAllNodeIds() { + return getNodeIds(null); + } + + /** + * Convenience method to filter nodes based on a condition. + * + * @param nodeFilter A {@link NodeFilter} for filtering the nodes + * @return A list of filtered nodes + */ + public List getNodeIds(NodeFilter nodeFilter) { + List nodeList = new ArrayList<>(); + readLock.lock(); + try { + if (nodeFilter == null) { + for (N node : nodes.values()) { + nodeList.add(node.getNodeID()); + } + } else { + for (N node : nodes.values()) { + if (nodeFilter.accept(node)) { + nodeList.add(node.getNodeID()); + } + } + } + } finally { + readLock.unlock(); + } + return nodeList; + } + /** * Convenience method to sort nodes. * @@ -320,11 +354,38 @@ public class ClusterNodeTracker { resourceName != null && !resourceName.isEmpty()); List retNodes = new ArrayList<>(); if (ResourceRequest.ANY.equals(resourceName)) { - return getAllNodes(); + retNodes.addAll(getAllNodes()); } else if (nodeNameToNodeMap.containsKey(resourceName)) { retNodes.add(nodeNameToNodeMap.get(resourceName)); } else if (nodesPerRack.containsKey(resourceName)) { - return nodesPerRack.get(resourceName); + retNodes.addAll(nodesPerRack.get(resourceName)); + } else { + LOG.info( + "Could not find a node matching given resourceName " + resourceName); + } + return retNodes; + } + + /** + * Convenience method to return list of {@link NodeId} corresponding to + * resourceName passed in the {@link ResourceRequest}. + * + * @param resourceName Host/rack name of the resource, or + * {@link ResourceRequest#ANY} + * @return list of {@link NodeId} that match the resourceName + */ + public List getNodeIdsByResourceName(final String resourceName) { + Preconditions.checkArgument( + resourceName != null && !resourceName.isEmpty()); + List retNodes = new ArrayList<>(); + if (ResourceRequest.ANY.equals(resourceName)) { + retNodes.addAll(getAllNodeIds()); + } else if (nodeNameToNodeMap.containsKey(resourceName)) { + retNodes.add(nodeNameToNodeMap.get(resourceName).getNodeID()); + } else if (nodesPerRack.containsKey(resourceName)) { + for (N node : nodesPerRack.get(resourceName)) { + retNodes.add(node.getNodeID()); + } } else { LOG.info( "Could not find a node matching given resourceName " + resourceName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 5649ccf7dca..d96d62545c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -49,4 +51,11 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable { * @throws IOException */ void reinitialize(Configuration conf, RMContext rmContext) throws IOException; + + /** + * Get the {@link NodeId} available in the cluster by resource name. + * @param resourceName resource name + * @return the number of available {@link NodeId} by resource name. + */ + List getNodeIds(String resourceName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f12efb4015b..e824b6c85db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -139,18 +139,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { Resource amResource; String partition; - if (rmApp == null || rmApp.getAMResourceRequest() == null) { + if (rmApp == null || rmApp.getAMResourceRequests() == null + || rmApp.getAMResourceRequests().isEmpty()) { // the rmApp may be undefined (the resource manager checks for this too) // and unmanaged applications do not provide an amResource request // in these cases, provide a default using the scheduler amResource = rmContext.getScheduler().getMinimumResourceCapability(); partition = CommonNodeLabelsManager.NO_LABEL; } else { - amResource = rmApp.getAMResourceRequest().getCapability(); + amResource = rmApp.getAMResourceRequests().get(0).getCapability(); partition = - (rmApp.getAMResourceRequest().getNodeLabelExpression() == null) + (rmApp.getAMResourceRequests().get(0) + .getNodeLabelExpression() == null) ? CommonNodeLabelsManager.NO_LABEL - : rmApp.getAMResourceRequest().getNodeLabelExpression(); + : rmApp.getAMResourceRequests().get(0).getNodeLabelExpression(); } setAppAMNodePartitionName(partition); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java index 4e85b671da7..10e627a4410 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java @@ -229,7 +229,7 @@ public class AppInfo { appNodeLabelExpression = app.getApplicationSubmissionContext().getNodeLabelExpression(); amNodeLabelExpression = (unmanagedApplication) ? null - : app.getAMResourceRequest().getNodeLabelExpression(); + : app.getAMResourceRequests().get(0).getNodeLabelExpression(); // Setting partition based resource usage of application ResourceScheduler scheduler = rm.getRMContext().getScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index f9f42ad6e54..aca2fc5ec9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -24,6 +24,7 @@ import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -678,6 +679,17 @@ public class MockRM extends ResourceManager { tokensConf); } + public RMApp submitApp(List amResourceRequests) + throws Exception { + return submitApp(amResourceRequests, "app1", + "user", null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, + amResourceRequests.get(0).getPriority(), + amResourceRequests.get(0).getNodeLabelExpression(), null, null); + } + public RMApp submitApp(Resource capability, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType, @@ -688,6 +700,30 @@ public class MockRM extends ResourceManager { Map applicationTimeouts, ByteBuffer tokensConf) throws Exception { + priority = (priority == null) ? Priority.newInstance(0) : priority; + ResourceRequest amResourceRequest = ResourceRequest.newInstance( + priority, ResourceRequest.ANY, capability, 1); + if (amLabel != null && !amLabel.isEmpty()) { + amResourceRequest.setNodeLabelExpression(amLabel.trim()); + } + return submitApp(Collections.singletonList(amResourceRequest), name, user, + acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, + keepContainers, isAppIdProvided, applicationId, + attemptFailuresValidityInterval, logAggregationContext, + cancelTokensWhenComplete, priority, amLabel, applicationTimeouts, + tokensConf); + } + + public RMApp submitApp(List amResourceRequests, String name, + String user, Map acls, boolean unmanaged, + String queue, int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + Map applicationTimeouts, + ByteBuffer tokensConf) + throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -718,7 +754,6 @@ public class MockRM extends ResourceManager { sub.setApplicationType(appType); ContainerLaunchContext clc = Records .newRecord(ContainerLaunchContext.class); - sub.setResource(capability); clc.setApplicationACLs(acls); if (ts != null && UserGroupInformation.isSecurityEnabled()) { DataOutputBuffer dob = new DataOutputBuffer(); @@ -733,12 +768,12 @@ public class MockRM extends ResourceManager { sub.setLogAggregationContext(logAggregationContext); } sub.setCancelTokensWhenComplete(cancelTokensWhenComplete); - ResourceRequest amResourceRequest = ResourceRequest.newInstance( - Priority.newInstance(0), ResourceRequest.ANY, capability, 1); if (amLabel != null && !amLabel.isEmpty()) { - amResourceRequest.setNodeLabelExpression(amLabel.trim()); + for (ResourceRequest amResourceRequest : amResourceRequests) { + amResourceRequest.setNodeLabelExpression(amLabel.trim()); + } } - sub.setAMContainerResourceRequest(amResourceRequest); + sub.setAMContainerResourceRequests(amResourceRequests); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 1aec76f3004..8ec3eae5d33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -50,6 +52,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -57,11 +61,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -72,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -307,7 +314,7 @@ public class TestAppManager{ ResourceRequest resReg = ResourceRequest.newInstance(Priority.newInstance(0), ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); - sub.setAMContainerResourceRequest(resReg); + sub.setAMContainerResourceRequests(Collections.singletonList(resReg)); req.setApplicationSubmissionContext(sub); sub.setAMContainerSpec(mock(ContainerLaunchContext.class)); try { @@ -517,8 +524,157 @@ public class TestAppManager{ Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType); } + @SuppressWarnings("deprecation") @Test - public void testRMAppSubmit() throws Exception { + public void testRMAppSubmitAMContainerResourceRequests() throws Exception { + asContext.setResource(Resources.createResource(1024)); + asContext.setAMContainerResourceRequest( + ResourceRequest.newInstance(Priority.newInstance(0), + ResourceRequest.ANY, Resources.createResource(1024), 1, true)); + List reqs = new ArrayList<>(); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(0), + ResourceRequest.ANY, Resources.createResource(1025), 1, false)); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(0), + "/rack", Resources.createResource(1025), 1, false)); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(0), + "/rack/node", Resources.createResource(1025), 1, true)); + asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs)); + // getAMContainerResourceRequest uses the first entry of + // getAMContainerResourceRequests + Assert.assertEquals(reqs.get(0), asContext.getAMContainerResourceRequest()); + Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests()); + RMApp app = testRMAppSubmit(); + for (ResourceRequest req : reqs) { + req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + } + // setAMContainerResourceRequests has priority over + // setAMContainerResourceRequest and setResource + Assert.assertEquals(reqs, app.getAMResourceRequests()); + } + + @SuppressWarnings("deprecation") + @Test + public void testRMAppSubmitAMContainerResourceRequest() throws Exception { + asContext.setResource(Resources.createResource(1024)); + asContext.setAMContainerResourceRequests(null); + ResourceRequest req = + ResourceRequest.newInstance(Priority.newInstance(0), + ResourceRequest.ANY, Resources.createResource(1025), 1, true); + asContext.setAMContainerResourceRequest(cloneResourceRequest(req)); + // getAMContainerResourceRequests uses a singleton list of + // getAMContainerResourceRequest + Assert.assertEquals(req, asContext.getAMContainerResourceRequest()); + Assert.assertEquals(req, asContext.getAMContainerResourceRequests().get(0)); + Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size()); + RMApp app = testRMAppSubmit(); + req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + // setAMContainerResourceRequest has priority over setResource + Assert.assertEquals(Collections.singletonList(req), + app.getAMResourceRequests()); + } + + @Test + public void testRMAppSubmitResource() throws Exception { + asContext.setResource(Resources.createResource(1024)); + asContext.setAMContainerResourceRequests(null); + RMApp app = testRMAppSubmit(); + // setResource + Assert.assertEquals(Collections.singletonList( + ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, + ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")), + app.getAMResourceRequests()); + } + + @Test + public void testRMAppSubmitNoResourceRequests() throws Exception { + asContext.setResource(null); + asContext.setAMContainerResourceRequests(null); + try { + testRMAppSubmit(); + Assert.fail("Should have failed due to no ResourceRequest"); + } catch (InvalidResourceRequestException e) { + Assert.assertEquals( + "Invalid resource request, no resources requested", + e.getMessage()); + } + } + + @Test + public void testRMAppSubmitAMContainerResourceRequestsDisagree() + throws Exception { + asContext.setResource(null); + List reqs = new ArrayList<>(); + ResourceRequest anyReq = ResourceRequest.newInstance( + Priority.newInstance(1), + ResourceRequest.ANY, Resources.createResource(1024), 1, false, "label1", + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)); + reqs.add(anyReq); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(2), + "/rack", Resources.createResource(1025), 2, false, "", + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(3), + "/rack/node", Resources.createResource(1026), 3, true, "", + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))); + asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs)); + RMApp app = testRMAppSubmit(); + // It should force the requests to all agree on these points + for (ResourceRequest req : reqs) { + req.setCapability(anyReq.getCapability()); + req.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)); + req.setNumContainers(1); + req.setPriority(Priority.newInstance(0)); + } + Assert.assertEquals(reqs, app.getAMResourceRequests()); + } + + @Test + public void testRMAppSubmitAMContainerResourceRequestsNoAny() + throws Exception { + asContext.setResource(null); + List reqs = new ArrayList<>(); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(1), + "/rack", Resources.createResource(1025), 1, false)); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(1), + "/rack/node", Resources.createResource(1025), 1, true)); + asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs)); + // getAMContainerResourceRequest uses the first entry of + // getAMContainerResourceRequests + Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests()); + try { + testRMAppSubmit(); + Assert.fail("Should have failed due to missing ANY ResourceRequest"); + } catch (InvalidResourceRequestException e) { + Assert.assertEquals( + "Invalid resource request, no resource request specified with *", + e.getMessage()); + } + } + + @Test + public void testRMAppSubmitAMContainerResourceRequestsTwoManyAny() + throws Exception { + asContext.setResource(null); + List reqs = new ArrayList<>(); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(1), + ResourceRequest.ANY, Resources.createResource(1025), 1, false)); + reqs.add(ResourceRequest.newInstance(Priority.newInstance(1), + ResourceRequest.ANY, Resources.createResource(1025), 1, false)); + asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs)); + // getAMContainerResourceRequest uses the first entry of + // getAMContainerResourceRequests + Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests()); + try { + testRMAppSubmit(); + Assert.fail("Should have failed due to too many ANY ResourceRequests"); + } catch (InvalidResourceRequestException e) { + Assert.assertEquals( + "Invalid resource request, only one resource request with * is " + + "allowed", e.getMessage()); + } + } + + private RMApp testRMAppSubmit() throws Exception { appMonitor.submitApplication(asContext, "test"); RMApp app = rmContext.getRMApps().get(appId); Assert.assertNotNull("app is null", app); @@ -529,12 +685,14 @@ public class TestAppManager{ // wait for event to be processed int timeoutSecs = 0; - while ((getAppEventType() == RMAppEventType.KILL) && + while ((getAppEventType() == RMAppEventType.KILL) && timeoutSecs++ < 20) { Thread.sleep(1000); } Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType()); + + return app; } @Test @@ -732,6 +890,15 @@ public class TestAppManager{ ResourceCalculator rs = mock(ResourceCalculator.class); when(scheduler.getResourceCalculator()).thenReturn(rs); + when(scheduler.getNormalizedResource((Resource) any())) + .thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocationOnMock) + throws Throwable { + return (Resource) invocationOnMock.getArguments()[0]; + } + }); + return scheduler; } @@ -748,4 +915,26 @@ public class TestAppManager{ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); } + private static ResourceRequest cloneResourceRequest(ResourceRequest req) { + return ResourceRequest.newInstance( + Priority.newInstance(req.getPriority().getPriority()), + new String(req.getResourceName()), + Resource.newInstance(req.getCapability().getMemorySize(), + req.getCapability().getVirtualCores()), + req.getNumContainers(), + req.getRelaxLocality(), + req.getNodeLabelExpression() != null + ? new String(req.getNodeLabelExpression()) : null, + ExecutionTypeRequest.newInstance( + req.getExecutionTypeRequest().getExecutionType())); + } + + private static List cloneResourceRequests( + List reqs) { + List cloneReqs = new ArrayList<>(); + for (ResourceRequest req : reqs) { + cloneReqs.add(cloneResourceRequest(req)); + } + return cloneReqs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 01c1b43dbb0..3c70efe1996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -1039,9 +1040,9 @@ public class TestClientRMService { spy(new RMAppImpl(applicationId3, rmContext, config, null, null, queueName, asContext, yarnScheduler, null, System.currentTimeMillis(), "YARN", null, - BuilderUtils.newResourceRequest( + Collections.singletonList(BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - Resource.newInstance(1024, 1), 1)){ + Resource.newInstance(1024, 1), 1))){ @Override public ApplicationReport createAndGetApplicationReport( String clientUserName, boolean allowAccess) { @@ -1055,7 +1056,8 @@ public class TestClientRMService { return report; } }); - app.getAMResourceRequest().setNodeLabelExpression(amNodeLabelExpression); + app.getAMResourceRequests().get(0) + .setNodeLabelExpression(amNodeLabelExpression); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationId.newInstance(123456, 1), 1); RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index c80a799e7d8..b4adf480b35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -28,6 +29,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -156,6 +160,186 @@ public class TestNodeBlacklistingOnAMFailures { currentNode.getNodeId(), allocatedContainers.get(0).getNodeId()); } + @Test(timeout = 100000) + public void testNodeBlacklistingOnAMFailureStrictNodeLocality() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, + true); + + DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = startRM(conf, dispatcher); + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + + // Register 5 nodes, so that we can blacklist atleast one if AM container + // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1. + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = + new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService()); + nm2.registerNode(); + + MockNM nm3 = + new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService()); + nm3.registerNode(); + + MockNM nm4 = + new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService()); + nm4.registerNode(); + + MockNM nm5 = + new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService()); + nm5.registerNode(); + + // Specify a strict locality on nm2 + List reqs = new ArrayList<>(); + ResourceRequest nodeReq = ResourceRequest.newInstance( + Priority.newInstance(0), nm2.getNodeId().getHost(), + Resource.newInstance(200, 1), 1, true); + ResourceRequest rackReq = ResourceRequest.newInstance( + Priority.newInstance(0), "/default-rack", + Resource.newInstance(200, 1), 1, false); + ResourceRequest anyReq = ResourceRequest.newInstance( + Priority.newInstance(0), ResourceRequest.ANY, + Resource.newInstance(200, 1), 1, false); + reqs.add(anyReq); + reqs.add(rackReq); + reqs.add(nodeReq); + RMApp app = rm.submitApp(reqs); + + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2); + ContainerId amContainerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + RMContainer rmContainer = scheduler.getRMContainer(amContainerId); + NodeId nodeWhereAMRan = rmContainer.getAllocatedNode(); + Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan); + + // Set the exist status to INVALID so that we can verify that the system + // automatically blacklisting the node + makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID); + + // restart the am + RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); + System.out.println("New AppAttempt launched " + attempt.getAppAttemptId()); + + nm2.nodeHeartbeat(true); + dispatcher.await(); + + // Now the AM container should be allocated + MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + + MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId()); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + amContainerId = + ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); + rmContainer = scheduler.getRMContainer(amContainerId); + nodeWhereAMRan = rmContainer.getAllocatedNode(); + + // The second AM should be on the same node because the strict locality + // made the eligible nodes only 1, so the blacklisting threshold kicked in + System.out.println("AM ran on " + nodeWhereAMRan); + Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan); + + am2.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + } + + @Test(timeout = 100000) + public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, + true); + + DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = startRM(conf, dispatcher); + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + + // Register 5 nodes, so that we can blacklist atleast one if AM container + // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1. + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = + new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService()); + nm2.registerNode(); + + MockNM nm3 = + new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService()); + nm3.registerNode(); + + MockNM nm4 = + new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService()); + nm4.registerNode(); + + MockNM nm5 = + new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService()); + nm5.registerNode(); + + // Specify a relaxed locality on nm2 + List reqs = new ArrayList<>(); + ResourceRequest nodeReq = ResourceRequest.newInstance( + Priority.newInstance(0), nm2.getNodeId().getHost(), + Resource.newInstance(200, 1), 1, true); + ResourceRequest rackReq = ResourceRequest.newInstance( + Priority.newInstance(0), "/default-rack", + Resource.newInstance(200, 1), 1, true); + ResourceRequest anyReq = ResourceRequest.newInstance( + Priority.newInstance(0), ResourceRequest.ANY, + Resource.newInstance(200, 1), 1, true); + reqs.add(anyReq); + reqs.add(rackReq); + reqs.add(nodeReq); + RMApp app = rm.submitApp(reqs); + + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2); + ContainerId amContainerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + RMContainer rmContainer = scheduler.getRMContainer(amContainerId); + NodeId nodeWhereAMRan = rmContainer.getAllocatedNode(); + Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan); + + // Set the exist status to INVALID so that we can verify that the system + // automatically blacklisting the node + makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID); + + // restart the am + RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); + System.out.println("New AppAttempt launched " + attempt.getAppAttemptId()); + + nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + nm5.nodeHeartbeat(true); + dispatcher.await(); + + // Now the AM container should be allocated + MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); + + MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId()); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + amContainerId = + ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); + rmContainer = scheduler.getRMContainer(amContainerId); + nodeWhereAMRan = rmContainer.getAllocatedNode(); + + // The second AM should be on a different node because the relaxed locality + // made the app schedulable on other nodes and nm2 is blacklisted + System.out.println("AM ran on " + nodeWhereAMRan); + Assert.assertNotEquals(nm2.getNodeId(), nodeWhereAMRan); + + am2.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + } + @Test(timeout = 100000) public void testNoBlacklistingForNonSystemErrors() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java new file mode 100644 index 00000000000..078b8fd3290 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestRMServerUtils { + @Test + public void testGetApplicableNodeCountForAMLocality() throws Exception { + List rack1Nodes = new ArrayList<>(); + for (int i = 0; i < 29; i++) { + rack1Nodes.add(NodeId.newInstance("host" + i, 1234)); + } + NodeId node1 = NodeId.newInstance("node1", 1234); + NodeId node2 = NodeId.newInstance("node2", 1234); + rack1Nodes.add(node2); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false); + ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); + Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100); + Mockito.when(scheduler.getNodeIds("/rack1")).thenReturn(rack1Nodes); + Mockito.when(scheduler.getNodeIds("node1")) + .thenReturn(Collections.singletonList(node1)); + Mockito.when(scheduler.getNodeIds("node2")) + .thenReturn(Collections.singletonList(node2)); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); + + ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY, + true, null); + List reqs = new ArrayList<>(); + reqs.add(anyReq); + Assert.assertEquals(100, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + ResourceRequest rackReq = createResourceRequest("/rack1", true, null); + reqs.add(rackReq); + Assert.assertEquals(30, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + anyReq.setRelaxLocality(false); + Assert.assertEquals(30, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(false); + Assert.assertEquals(100, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + ResourceRequest node1Req = createResourceRequest("node1", false, null); + reqs.add(node1Req); + Assert.assertEquals(100, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node1Req.setRelaxLocality(true); + Assert.assertEquals(1, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(true); + Assert.assertEquals(31, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + ResourceRequest node2Req = createResourceRequest("node2", false, null); + reqs.add(node2Req); + Assert.assertEquals(31, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node2Req.setRelaxLocality(true); + Assert.assertEquals(31, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(false); + Assert.assertEquals(2, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node1Req.setRelaxLocality(false); + Assert.assertEquals(1, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node2Req.setRelaxLocality(false); + Assert.assertEquals(100, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + } + + @Test + public void testGetApplicableNodeCountForAMLabels() throws Exception { + Set noLabelNodes = new HashSet<>(); + for (int i = 0; i < 80; i++) { + noLabelNodes.add(NodeId.newInstance("host" + i, 1234)); + } + Set label1Nodes = new HashSet<>(); + for (int i = 80; i < 90; i++) { + label1Nodes.add(NodeId.newInstance("host" + i, 1234)); + } + label1Nodes.add(NodeId.newInstance("host101", 0)); + label1Nodes.add(NodeId.newInstance("host102", 0)); + Map> label1NodesMap = new HashMap<>(); + label1NodesMap.put("label1", label1Nodes); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); + Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); + RMNodeLabelsManager labMan = Mockito.mock(RMNodeLabelsManager.class); + Mockito.when(labMan.getNodesWithoutALabel()).thenReturn(noLabelNodes); + Mockito.when(labMan.getLabelsToNodes(Collections.singleton("label1"))) + .thenReturn(label1NodesMap); + Mockito.when(rmContext.getNodeLabelManager()).thenReturn(labMan); + + ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY, + true, null); + List reqs = new ArrayList<>(); + reqs.add(anyReq); + Assert.assertEquals(80, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + anyReq.setNodeLabelExpression("label1"); + Assert.assertEquals(10, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + } + + @Test + public void testGetApplicableNodeCountForAMLocalityAndLabels() + throws Exception { + List rack1Nodes = new ArrayList<>(); + for (int i = 0; i < 29; i++) { + rack1Nodes.add(NodeId.newInstance("host" + i, 1234)); + } + NodeId node1 = NodeId.newInstance("node1", 1234); + NodeId node2 = NodeId.newInstance("node2", 1234); + rack1Nodes.add(node2); + Set noLabelNodes = new HashSet<>(); + for (int i = 0; i < 19; i++) { + noLabelNodes.add(rack1Nodes.get(i)); + } + noLabelNodes.add(node2); + for (int i = 29; i < 89; i++) { + noLabelNodes.add(NodeId.newInstance("host" + i, 1234)); + } + Set label1Nodes = new HashSet<>(); + label1Nodes.add(node1); + for (int i = 89; i < 93; i++) { + label1Nodes.add(NodeId.newInstance("host" + i, 1234)); + } + for (int i = 19; i < 29; i++) { + label1Nodes.add(rack1Nodes.get(i)); + } + label1Nodes.add(NodeId.newInstance("host101", 0)); + label1Nodes.add(NodeId.newInstance("host102", 0)); + Map> label1NodesMap = new HashMap<>(); + label1NodesMap.put("label1", label1Nodes); + + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); + Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100); + Mockito.when(scheduler.getNodeIds("/rack1")).thenReturn(rack1Nodes); + Mockito.when(scheduler.getNodeIds("node1")) + .thenReturn(Collections.singletonList(node1)); + Mockito.when(scheduler.getNodeIds("node2")) + .thenReturn(Collections.singletonList(node2)); + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); + RMNodeLabelsManager labMan = Mockito.mock(RMNodeLabelsManager.class); + Mockito.when(labMan.getNodesWithoutALabel()).thenReturn(noLabelNodes); + Mockito.when(labMan.getLabelsToNodes(Collections.singleton("label1"))) + .thenReturn(label1NodesMap); + Mockito.when(rmContext.getNodeLabelManager()).thenReturn(labMan); + + ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY, + true, null); + List reqs = new ArrayList<>(); + reqs.add(anyReq); + Assert.assertEquals(80, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + ResourceRequest rackReq = createResourceRequest("/rack1", true, null); + reqs.add(rackReq); + Assert.assertEquals(20, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + anyReq.setRelaxLocality(false); + Assert.assertEquals(20, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(false); + Assert.assertEquals(80, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + ResourceRequest node1Req = createResourceRequest("node1", false, null); + reqs.add(node1Req); + Assert.assertEquals(80, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node1Req.setRelaxLocality(true); + Assert.assertEquals(0, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(true); + Assert.assertEquals(20, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + ResourceRequest node2Req = createResourceRequest("node2", false, null); + reqs.add(node2Req); + Assert.assertEquals(20, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node2Req.setRelaxLocality(true); + Assert.assertEquals(20, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(false); + Assert.assertEquals(1, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node1Req.setRelaxLocality(false); + Assert.assertEquals(1, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node2Req.setRelaxLocality(false); + Assert.assertEquals(80, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + anyReq.setNodeLabelExpression("label1"); + rackReq.setNodeLabelExpression("label1"); + node1Req.setNodeLabelExpression("label1"); + node2Req.setNodeLabelExpression("label1"); + anyReq.setRelaxLocality(true); + reqs = new ArrayList<>(); + reqs.add(anyReq); + Assert.assertEquals(15, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + rackReq.setRelaxLocality(true); + reqs.add(rackReq); + Assert.assertEquals(10, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + anyReq.setRelaxLocality(false); + Assert.assertEquals(10, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(false); + Assert.assertEquals(15, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + node1Req.setRelaxLocality(false); + reqs.add(node1Req); + Assert.assertEquals(15, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node1Req.setRelaxLocality(true); + Assert.assertEquals(1, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(true); + Assert.assertEquals(11, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + + node2Req.setRelaxLocality(false); + reqs.add(node2Req); + Assert.assertEquals(11, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node2Req.setRelaxLocality(true); + Assert.assertEquals(11, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + rackReq.setRelaxLocality(false); + Assert.assertEquals(1, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node1Req.setRelaxLocality(false); + Assert.assertEquals(0, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + node2Req.setRelaxLocality(false); + Assert.assertEquals(15, + RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs)); + } + + private ResourceRequest createResourceRequest(String resource, + boolean relaxLocality, String nodeLabel) { + return ResourceRequest.newInstance(Priority.newInstance(0), + resource, Resource.newInstance(1, 1), 1, relaxLocality, nodeLabel); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 3d3aeaa41b7..b9adf78e32d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -57,7 +57,7 @@ import com.google.common.collect.Lists; public abstract class MockAsm extends MockApps { public static class ApplicationBase implements RMApp { - ResourceRequest amReq; + List amReqs; @Override public String getUser() { throw new UnsupportedOperationException("Not supported yet."); @@ -192,8 +192,8 @@ public abstract class MockAsm extends MockApps { } @Override - public ResourceRequest getAMResourceRequest() { - return this.amReq; + public List getAMResourceRequests() { + return this.amReqs; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index f6290ebda4f..56dfe6eb0f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -525,7 +525,8 @@ public class TestSystemMetricsPublisher { when(app.getAppNodeLabelExpression()).thenCallRealMethod(); ResourceRequest amReq = mock(ResourceRequest.class); when(amReq.getNodeLabelExpression()).thenReturn("high-mem"); - when(app.getAMResourceRequest()).thenReturn(amReq); + when(app.getAMResourceRequests()) + .thenReturn(Collections.singletonList(amReq)); when(app.getAmNodeLabelExpression()).thenCallRealMethod(); when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10)); when(app.getCallerContext()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index bcc4be1ca5b..d3639085977 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -62,14 +63,14 @@ public class MockRMApp implements RMApp { StringBuilder diagnostics = new StringBuilder(); RMAppAttempt attempt; int maxAppAttempts = 1; - ResourceRequest amReq; + List amReqs; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; id = MockApps.newAppID(newid); state = newState; - amReq = ResourceRequest.newInstance(Priority.UNDEFINED, "0.0.0.0", - Resource.newInstance(0, 0), 1); + amReqs = Collections.singletonList(ResourceRequest.newInstance( + Priority.UNDEFINED, "0.0.0.0", Resource.newInstance(0, 0), 1)); } public MockRMApp(int newid, long time, RMAppState newState, String userName) { @@ -276,8 +277,8 @@ public class MockRMApp implements RMApp { } @Override - public ResourceRequest getAMResourceRequest() { - return this.amReq; + public List getAMResourceRequests() { + return this.amReqs; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 9977683898f..1ec951023fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -30,8 +30,10 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Map; import org.apache.commons.logging.Log; @@ -267,7 +269,8 @@ public class TestRMAppTransitions { submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class)); RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, user, queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null, mock(ResourceRequest.class)); + System.currentTimeMillis(), "YARN", null, + new ArrayList()); testAppStartState(applicationId, user, name, queue, application); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), @@ -1020,9 +1023,9 @@ public class TestRMAppTransitions { submissionContext.getQueue(), submissionContext, scheduler, null, appState.getSubmitTime(), submissionContext.getApplicationType(), submissionContext.getApplicationTags(), - BuilderUtils.newResourceRequest( + Collections.singletonList(BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1)); + submissionContext.getResource(), 1))); Assert.assertEquals(RMAppState.NEW, application.getState()); RMAppEvent recoverEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index ced5bd9b8d8..9a4b6dc7f72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -328,9 +328,9 @@ public class TestRMAppAttemptTransitions { applicationAttempt = new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler, masterService, submissionContext, new Configuration(), - BuilderUtils.newResourceRequest( + Collections.singletonList(BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1), application); + submissionContext.getResource(), 1)), application); when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt); when(application.getApplicationId()).thenReturn(applicationId); @@ -1108,9 +1108,9 @@ public class TestRMAppAttemptTransitions { new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler,masterService, submissionContext, myConf, - BuilderUtils.newResourceRequest( + Collections.singletonList(BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1), application); + submissionContext.getResource(), 1)), application); //submit, schedule and allocate app attempt myApplicationAttempt.handle( @@ -1584,9 +1584,9 @@ public class TestRMAppAttemptTransitions { applicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler, masterService, submissionContext, new Configuration(), - BuilderUtils.newResourceRequest( + Collections.singletonList(BuilderUtils.newResourceRequest( RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1), application); + submissionContext.getResource(), 1)), application); when(submissionContext.getKeepContainersAcrossApplicationAttempts()) .thenReturn(true); when(submissionContext.getMaxAppAttempts()).thenReturn(1); @@ -1645,9 +1645,10 @@ public class TestRMAppAttemptTransitions { applicationAttempt = new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext, scheduler, masterService, submissionContext, - new Configuration(), ResourceRequest.newInstance( - Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3, - false, "label-expression"), application); + new Configuration(), Collections.singletonList( + ResourceRequest.newInstance(Priority.UNDEFINED, "host1", + Resource.newInstance(3333, 1), 3, + false, "label-expression")), application); new RMAppAttemptImpl.ScheduleTransition().transition( (RMAppAttemptImpl) applicationAttempt, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 2884f6708aa..37b0da8d6d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -613,7 +614,8 @@ public class TestApplicationLimits { ResourceRequest amResourceRequest = mock(ResourceRequest.class); Resource amResource = Resources.createResource(0, 0); when(amResourceRequest.getCapability()).thenReturn(amResource); - when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + when(rmApp.getAMResourceRequests()).thenReturn( + Collections.singletonList(amResourceRequest)); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 2fa06e84788..547571e3a5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -639,7 +640,8 @@ public class TestApplicationLimitsByPartition { ResourceRequest amResourceRequest = mock(ResourceRequest.class); Resource amResource = Resources.createResource(0, 0); when(amResourceRequest.getCapability()).thenReturn(amResource); - when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + when(rmApp.getAMResourceRequests()).thenReturn( + Collections.singletonList(amResourceRequest)); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ba64ed799c9..37ebefd7e53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -3203,8 +3203,8 @@ public class TestCapacityScheduler { RMApp rmApp = rm.submitApp(amMemory, "app-1", "user_0", null, queueName); assertEquals("RMApp does not containes minimum allocation", - minAllocResource, rmApp.getAMResourceRequest().getCapability()); - + minAllocResource, rmApp.getAMResourceRequests().get(0).getCapability()); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); LeafQueue queueA = (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 50dc92ed897..a59a3cc2f89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -153,7 +153,8 @@ public class TestLeafQueue { amResourceRequest = mock(ResourceRequest.class); when(amResourceRequest.getCapability()).thenReturn( Resources.createResource(0, 0)); - when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest); + when(rmApp.getAMResourceRequests()).thenReturn( + Collections.singletonList(amResourceRequest)); Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); when(spyRMContext.getRMApps()).thenReturn(spyApps); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 670793d36e3..3bbb5771308 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3206,6 +3206,84 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(1, app.getLiveContainers().size()); } + @Test + public void testAMStrictLocalityRack() throws IOException { + testAMStrictLocality(false, false); + } + + @Test + public void testAMStrictLocalityNode() throws IOException { + testAMStrictLocality(true, false); + } + + @Test + public void testAMStrictLocalityRackInvalid() throws IOException { + testAMStrictLocality(false, true); + } + + @Test + public void testAMStrictLocalityNodeInvalid() throws IOException { + testAMStrictLocality(true, true); + } + + private void testAMStrictLocality(boolean node, boolean invalid) + throws IOException { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(2, Resources.createResource(1024), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + List reqs = new ArrayList<>(); + ResourceRequest nodeRequest = createResourceRequest(1024, + node2.getHostName(), 1, 1, true); + if (node && invalid) { + nodeRequest.setResourceName("invalid"); + } + ResourceRequest rackRequest = createResourceRequest(1024, + node2.getRackName(), 1, 1, !node); + if (!node && invalid) { + rackRequest.setResourceName("invalid"); + } + ResourceRequest anyRequest = createResourceRequest(1024, + ResourceRequest.ANY, 1, 1, false); + reqs.add(anyRequest); + reqs.add(rackRequest); + if (node) { + reqs.add(nodeRequest); + } + + ApplicationAttemptId attId1 = + createSchedulingRequest("queue1", "user1", reqs); + + scheduler.update(); + + NodeUpdateSchedulerEvent node2UpdateEvent = + new NodeUpdateSchedulerEvent(node2); + + FSAppAttempt app = scheduler.getSchedulerApp(attId1); + + // node2 should get the container + scheduler.handle(node2UpdateEvent); + if (invalid) { + assertEquals(0, app.getLiveContainers().size()); + assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers()); + assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + } else { + assertEquals(1, app.getLiveContainers().size()); + assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers()); + assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers()); + } + } + /** * Strict locality requests shouldn't reserve resources on another node. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java index c1815bf2b33..dadb0753da6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java @@ -1168,8 +1168,8 @@ public class TestRMWebServicesApps extends JerseyTestBase { assertEquals(app1.getApplicationId().toString(), appInfo.getAppId()); assertEquals(app1.getName(), appInfo.getName()); assertEquals(app1.createApplicationState(), appInfo.getState()); - assertEquals(app1.getAMResourceRequest().getCapability().getMemorySize(), - appInfo.getAllocatedMB()); + assertEquals(app1.getAMResourceRequests().get(0).getCapability() + .getMemorySize(), appInfo.getAllocatedMB()); rm.stop(); } @@ -1378,7 +1378,7 @@ public class TestRMWebServicesApps extends JerseyTestBase { expectedNumberOfElements++; appNodeLabelExpression = info.getString("appNodeLabelExpression"); } - if (app.getAMResourceRequest().getNodeLabelExpression() != null) { + if (app.getAMResourceRequests().get(0).getNodeLabelExpression() != null) { expectedNumberOfElements++; amNodeLabelExpression = info.getString("amNodeLabelExpression"); } @@ -1485,7 +1485,7 @@ public class TestRMWebServicesApps extends JerseyTestBase { app.getApplicationSubmissionContext().getNodeLabelExpression(), appNodeLabelExpression); assertEquals("unmanagedApplication doesn't match", - app.getAMResourceRequest().getNodeLabelExpression(), + app.getAMResourceRequests().get(0).getNodeLabelExpression(), amNodeLabelExpression); assertEquals("amRPCAddress", AppInfo.getAmRPCAddressFromRMAppAttempt(app.getCurrentAppAttempt()), @@ -1512,7 +1512,7 @@ public class TestRMWebServicesApps extends JerseyTestBase { String nodeLabelExpression, int numContainers, boolean relaxLocality, int priority, String resourceName, long memory, long vCores, String executionType, boolean enforceExecutionType) { - ResourceRequest request = app.getAMResourceRequest(); + ResourceRequest request = app.getAMResourceRequests().get(0); assertEquals("nodeLabelExpression doesn't match", request.getNodeLabelExpression(), nodeLabelExpression); assertEquals("numContainers doesn't match", request.getNumContainers(),