YARN-8163. Add support for Node Labels in opportunistic scheduling. Contributed by Abhishek Modi.

This commit is contained in:
Inigo Goiri 2018-05-04 14:59:59 -07:00
parent 4cdbdce752
commit 6a69239d86
7 changed files with 149 additions and 8 deletions

View File

@ -64,6 +64,26 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
return remoteNode;
}
/**
* Create new Instance.
* @param nodeId NodeId.
* @param httpAddress Http address.
* @param rackName Rack Name.
* @param nodePartition Node Partition.
* @return RemoteNode Instance.
*/
@Private
@Unstable
public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
String rackName, String nodePartition) {
RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
remoteNode.setNodeId(nodeId);
remoteNode.setHttpAddress(httpAddress);
remoteNode.setRackName(rackName);
remoteNode.setNodePartition(nodePartition);
return remoteNode;
}
/**
* Get {@link NodeId}.
* @return NodeId.
@ -117,6 +137,23 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
* @param other RemoteNode.
* @return Comparison.
*/
/**
* Get Node Partition.
* @return Node Partition.
*/
@Private
@Unstable
public abstract String getNodePartition();
/**
* Set Node Partition.
* @param nodePartition
*/
@Private
@Unstable
public abstract void setNodePartition(String nodePartition);
@Override
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
@ -127,6 +164,7 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
"rackName=" + getRackName() + ", " +
"httpAddress=" + getHttpAddress() + "}";
"httpAddress=" + getHttpAddress() + ", " +
"partition=" + getNodePartition() + "}";
}
}

View File

@ -136,6 +136,25 @@ public class RemoteNodePBImpl extends RemoteNode {
builder.setRackName(rackName);
}
@Override
public String getNodePartition() {
RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodePartition()) {
return null;
}
return (p.getNodePartition());
}
@Override
public void setNodePartition(String nodePartition) {
maybeInitBuilder();
if (nodePartition == null) {
builder.clearNodePartition();
return;
}
builder.setNodePartition(nodePartition);
}
@Override
public int hashCode() {
return getProto().hashCode();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -461,10 +462,17 @@ public class OpportunisticContainerAllocator {
private Collection<RemoteNode> findNodeCandidates(int loopIndex,
Map<String, RemoteNode> allNodes, Set<String> blackList,
EnrichedResourceRequest enrichedRR) {
if (loopIndex > 1) {
return allNodes.values();
} else {
LinkedList<RemoteNode> retList = new LinkedList<>();
String partition = getRequestPartition(enrichedRR);
if (loopIndex > 1) {
for (RemoteNode remoteNode : allNodes.values()) {
if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
retList.add(remoteNode);
}
}
return retList;
} else {
int numContainers = enrichedRR.getRequest().getNumContainers();
while (numContainers > 0) {
if (loopIndex == 0) {
@ -489,8 +497,10 @@ public class OpportunisticContainerAllocator {
private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
Set<String> blackList, int numContainers) {
String partition = getRequestPartition(enrichedRR);
for (RemoteNode rNode : allNodes.values()) {
if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
enrichedRR.getRackLocations().contains(rNode.getRackName())) {
if (blackList.contains(rNode.getNodeId().getHost())) {
retList.addLast(rNode);
} else {
@ -508,9 +518,11 @@ public class OpportunisticContainerAllocator {
private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
int numContainers) {
String partition = getRequestPartition(enrichedRR);
for (String nodeName : enrichedRR.getNodeLocations()) {
RemoteNode remoteNode = allNodes.get(nodeName);
if (remoteNode != null) {
if (remoteNode != null &&
StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
retList.add(remoteNode);
numContainers--;
}
@ -563,7 +575,7 @@ public class OpportunisticContainerAllocator {
capability, currTime + tokenExpiry,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
schedulerKey.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
null, getRemoteNodePartition(node), ContainerType.TASK,
ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId());
byte[] pwd =
tokenSecretManager.createPassword(containerTokenIdentifier);
@ -616,4 +628,20 @@ public class OpportunisticContainerAllocator {
}
return partitionedRequests;
}
private String getRequestPartition(EnrichedResourceRequest enrichedRR) {
String partition = enrichedRR.getRequest().getNodeLabelExpression();
if (partition == null) {
partition = CommonNodeLabelsManager.NO_LABEL;
}
return partition;
}
private String getRemoteNodePartition(RemoteNode node) {
String partition = node.getNodePartition();
if (partition == null) {
partition = CommonNodeLabelsManager.NO_LABEL;
}
return partition;
}
}

View File

@ -31,6 +31,7 @@ message RemoteNodeProto {
optional NodeIdProto node_id = 1;
optional string http_address = 2;
optional string rack_name = 3;
optional string node_partition = 4;
}
message RegisterDistributedSchedulingAMResponseProto {

View File

@ -596,4 +596,41 @@ public class TestOpportunisticContainerAllocator {
}
Assert.assertEquals(100, containers.size());
}
@Test
public void testAllocationWithNodeLabels() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs =
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
"*", Resources.createResource(1 * GB), 1, true, "label",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
/* Since there is no node satisfying node label constraints, requests
won't get fulfilled.
*/
Assert.assertEquals(0, containers.size());
Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1",
"label")));
containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
Assert.assertEquals(1, containers.size());
Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
@ -174,6 +175,16 @@ public class OpportunisticContainerAllocatorAMService
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
if (!partitionedAsks.getOpportunistic().isEmpty()) {
String appPartition = appAttempt.getAppAMNodePartitionName();
for (ResourceRequest req : partitionedAsks.getOpportunistic()) {
if (null == req.getNodeLabelExpression()) {
req.setNodeLabelExpression(appPartition);
}
}
}
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(
request.getResourceBlacklistRequest(),
@ -436,6 +447,7 @@ public class OpportunisticContainerAllocatorAMService
if (node != null) {
RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
rNode.setRackName(node.getRackName());
rNode.setNodePartition(node.getPartition());
return rNode;
}
return null;

View File

@ -927,6 +927,8 @@ public class TestOpportunisticContainerAllocatorAMService {
distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost());
Assert.assertEquals(
"l1", dsAllocResp.getNodesForScheduling().get(1).getNodePartition());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
@ -1004,9 +1006,13 @@ public class TestOpportunisticContainerAllocatorAMService {
.getExecutionTypeRequest().getEnforceExecutionType());
DistributedSchedulingAllocateResponse resp = factory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
RemoteNode remoteNode1 = RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "http://h1:4321");
RemoteNode remoteNode2 = RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "http://h2:4321");
remoteNode2.setNodePartition("l1");
resp.setNodesForScheduling(
Arrays.asList(RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "http://h1:4321")));
Arrays.asList(remoteNode1, remoteNode2));
return resp;
}
};