From 6a69239d867070ee85d79026542033ac661c4c1c Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 4 May 2018 14:59:59 -0700 Subject: [PATCH] YARN-8163. Add support for Node Labels in opportunistic scheduling. Contributed by Abhishek Modi. --- .../api/protocolrecords/RemoteNode.java | 40 ++++++++++++++++++- .../impl/pb/RemoteNodePBImpl.java | 19 +++++++++ .../OpportunisticContainerAllocator.java | 38 +++++++++++++++--- .../yarn_server_common_service_protos.proto | 1 + .../TestOpportunisticContainerAllocator.java | 37 +++++++++++++++++ ...ortunisticContainerAllocatorAMService.java | 12 ++++++ ...ortunisticContainerAllocatorAMService.java | 10 ++++- 7 files changed, 149 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java index f621aa209a5..67ad5bac294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -64,6 +64,26 @@ public abstract class RemoteNode implements Comparable { return remoteNode; } + /** + * Create new Instance. + * @param nodeId NodeId. + * @param httpAddress Http address. + * @param rackName Rack Name. + * @param nodePartition Node Partition. + * @return RemoteNode Instance. + */ + @Private + @Unstable + public static RemoteNode newInstance(NodeId nodeId, String httpAddress, + String rackName, String nodePartition) { + RemoteNode remoteNode = Records.newRecord(RemoteNode.class); + remoteNode.setNodeId(nodeId); + remoteNode.setHttpAddress(httpAddress); + remoteNode.setRackName(rackName); + remoteNode.setNodePartition(nodePartition); + return remoteNode; + } + /** * Get {@link NodeId}. * @return NodeId. @@ -117,6 +137,23 @@ public abstract class RemoteNode implements Comparable { * @param other RemoteNode. * @return Comparison. */ + + /** + * Get Node Partition. + * @return Node Partition. + */ + @Private + @Unstable + public abstract String getNodePartition(); + + /** + * Set Node Partition. + * @param nodePartition + */ + @Private + @Unstable + public abstract void setNodePartition(String nodePartition); + @Override public int compareTo(RemoteNode other) { return this.getNodeId().compareTo(other.getNodeId()); @@ -127,6 +164,7 @@ public abstract class RemoteNode implements Comparable { return "RemoteNode{" + "nodeId=" + getNodeId() + ", " + "rackName=" + getRackName() + ", " + - "httpAddress=" + getHttpAddress() + "}"; + "httpAddress=" + getHttpAddress() + ", " + + "partition=" + getNodePartition() + "}"; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java index c2492cf4663..8fb4357c8df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java @@ -136,6 +136,25 @@ public class RemoteNodePBImpl extends RemoteNode { builder.setRackName(rackName); } + @Override + public String getNodePartition() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodePartition()) { + return null; + } + return (p.getNodePartition()); + } + + @Override + public void setNodePartition(String nodePartition) { + maybeInitBuilder(); + if (nodePartition == null) { + builder.clearNodePartition(); + return; + } + builder.setNodePartition(nodePartition); + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 1f53648d55d..ae1ba437a50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -461,10 +462,17 @@ public class OpportunisticContainerAllocator { private Collection findNodeCandidates(int loopIndex, Map allNodes, Set blackList, EnrichedResourceRequest enrichedRR) { + LinkedList retList = new LinkedList<>(); + String partition = getRequestPartition(enrichedRR); if (loopIndex > 1) { - return allNodes.values(); + for (RemoteNode remoteNode : allNodes.values()) { + if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { + retList.add(remoteNode); + } + } + return retList; } else { - LinkedList retList = new LinkedList<>(); + int numContainers = enrichedRR.getRequest().getNumContainers(); while (numContainers > 0) { if (loopIndex == 0) { @@ -489,8 +497,10 @@ public class OpportunisticContainerAllocator { private int collectRackLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, LinkedList retList, Set blackList, int numContainers) { + String partition = getRequestPartition(enrichedRR); for (RemoteNode rNode : allNodes.values()) { - if (enrichedRR.getRackLocations().contains(rNode.getRackName())) { + if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && + enrichedRR.getRackLocations().contains(rNode.getRackName())) { if (blackList.contains(rNode.getNodeId().getHost())) { retList.addLast(rNode); } else { @@ -508,9 +518,11 @@ public class OpportunisticContainerAllocator { private int collectNodeLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, List retList, int numContainers) { + String partition = getRequestPartition(enrichedRR); for (String nodeName : enrichedRR.getNodeLocations()) { RemoteNode remoteNode = allNodes.get(nodeName); - if (remoteNode != null) { + if (remoteNode != null && + StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { retList.add(remoteNode); numContainers--; } @@ -563,7 +575,7 @@ public class OpportunisticContainerAllocator { capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, schedulerKey.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + null, getRemoteNodePartition(node), ContainerType.TASK, ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId()); byte[] pwd = tokenSecretManager.createPassword(containerTokenIdentifier); @@ -616,4 +628,20 @@ public class OpportunisticContainerAllocator { } return partitionedRequests; } + + private String getRequestPartition(EnrichedResourceRequest enrichedRR) { + String partition = enrichedRR.getRequest().getNodeLabelExpression(); + if (partition == null) { + partition = CommonNodeLabelsManager.NO_LABEL; + } + return partition; + } + + private String getRemoteNodePartition(RemoteNode node) { + String partition = node.getNodePartition(); + if (partition == null) { + partition = CommonNodeLabelsManager.NO_LABEL; + } + return partition; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 1b090bf232b..387ddb43213 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RemoteNodeProto { optional NodeIdProto node_id = 1; optional string http_address = 2; optional string rack_name = 3; + optional string node_partition = 4; } message RegisterDistributedSchedulingAMResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 788b0b386a8..2d3b09914c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -596,4 +596,41 @@ public class TestOpportunisticContainerAllocator { } Assert.assertEquals(100, containers.size()); } + + @Test + public void testAllocationWithNodeLabels() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, "label", + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"))); + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + /* Since there is no node satisfying node label constraints, requests + won't get fulfilled. + */ + Assert.assertEquals(0, containers.size()); + Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size()); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1", + "label"))); + + containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(1, containers.size()); + Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index ce425dfaac4..9b136272885 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; @@ -174,6 +175,16 @@ public class OpportunisticContainerAllocatorAMService appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); + if (!partitionedAsks.getOpportunistic().isEmpty()) { + String appPartition = appAttempt.getAppAMNodePartitionName(); + + for (ResourceRequest req : partitionedAsks.getOpportunistic()) { + if (null == req.getNodeLabelExpression()) { + req.setNodeLabelExpression(appPartition); + } + } + } + List oppContainers = oppContainerAllocator.allocateContainers( request.getResourceBlacklistRequest(), @@ -436,6 +447,7 @@ public class OpportunisticContainerAllocatorAMService if (node != null) { RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress()); rNode.setRackName(node.getRackName()); + rNode.setNodePartition(node.getPartition()); return rNode; } return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index efa76bcf920..5542157c940 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -927,6 +927,8 @@ public class TestOpportunisticContainerAllocatorAMService { distAllReq.getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost()); + Assert.assertEquals( + "l1", dsAllocResp.getNodesForScheduling().get(1).getNodePartition()); FinishApplicationMasterResponse dsfinishResp = new FinishApplicationMasterResponsePBImpl( @@ -1004,9 +1006,13 @@ public class TestOpportunisticContainerAllocatorAMService { .getExecutionTypeRequest().getEnforceExecutionType()); DistributedSchedulingAllocateResponse resp = factory .newRecordInstance(DistributedSchedulingAllocateResponse.class); + RemoteNode remoteNode1 = RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "http://h1:4321"); + RemoteNode remoteNode2 = RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "http://h2:4321"); + remoteNode2.setNodePartition("l1"); resp.setNodesForScheduling( - Arrays.asList(RemoteNode.newInstance( - NodeId.newInstance("h1", 1234), "http://h1:4321"))); + Arrays.asList(remoteNode1, remoteNode2)); return resp; } };