YARN-8721. Relax NE node-attribute check when attribute doesn't exist on a node. Contributed by Sunil Govindan.

This commit is contained in:
Weiwei Yang 2018-08-28 17:25:19 +08:00 committed by Sunil G
parent 67ae81f0e0
commit 52194351e7
6 changed files with 287 additions and 53 deletions

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
/**
@ -126,4 +127,10 @@ public abstract Map<String, Set<NodeAttribute>> getNodesToAttributes(
// futuristic
// public set<NodeId> getNodesMatchingExpression(String nodeLabelExp);
/**
* Refresh node attributes on a given node during RM recovery.
* @param nodeId Node Id
*/
public abstract void refreshNodeAttributesToScheduler(NodeId nodeId);
}

View File

@ -725,4 +725,27 @@ protected void serviceStop() throws Exception {
public void setRMContext(RMContext context) {
this.rmContext = context;
}
/**
* Refresh node attributes on a given node during RM recovery.
* @param nodeId Node Id
*/
public void refreshNodeAttributesToScheduler(NodeId nodeId) {
String hostName = nodeId.getHost();
Map<String, Set<NodeAttribute>> newNodeToAttributesMap =
new HashMap<>();
Host host = nodeCollections.get(hostName);
if (host == null || host.attributes == null) {
return;
}
newNodeToAttributesMap.put(hostName, host.attributes.keySet());
// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
.values());
rmContext.getDispatcher().getEventHandler().handle(
new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
}
}
}

View File

@ -1989,6 +1989,12 @@ private void addNode(RMNode nodeManager) {
schedulerNode.getTotalResource());
}
// recover attributes from store if any.
if (rmContext.getNodeAttributesManager() != null) {
rmContext.getNodeAttributesManager()
.refreshNodeAttributesToScheduler(schedulerNode.getNodeID());
}
Resource clusterResource = getClusterResource();
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

View File

@ -113,7 +113,7 @@ private static boolean canSatisfySingleConstraintExpression(
|| maxScopeCardinality <= desiredMaxCardinality);
}
private static boolean canSatisfyNodeConstraintExpresssion(
private static boolean canSatisfyNodeConstraintExpression(
SingleConstraint sc, TargetExpression targetExpression,
SchedulerNode schedulerNode) {
Set<String> values = targetExpression.getTargetValues();
@ -138,45 +138,67 @@ private static boolean canSatisfyNodeConstraintExpresssion(
return true;
}
if (schedulerNode.getNodeAttributes() == null ||
!schedulerNode.getNodeAttributes().contains(requestAttribute)) {
if(LOG.isDebugEnabled()) {
LOG.debug("Incoming requestAttribute:" + requestAttribute
+ "is not present in " + schedulerNode.getNodeID());
}
return false;
}
boolean found = false;
for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
.iterator(); it.hasNext();) {
NodeAttribute nodeAttribute = it.next();
return getNodeConstraintEvaluatedResult(schedulerNode, opCode,
requestAttribute);
}
return true;
}
private static boolean getNodeConstraintEvaluatedResult(
SchedulerNode schedulerNode,
NodeAttributeOpCode opCode, NodeAttribute requestAttribute) {
// In case, attributes in a node is empty or incoming attributes doesn't
// exist on given node, accept such nodes for scheduling if opCode is
// equals to NE. (for eg. java != 1.8 could be scheduled on a node
// where java is not configured.)
if (schedulerNode.getNodeAttributes() == null ||
!schedulerNode.getNodeAttributes().contains(requestAttribute)) {
if (opCode == NodeAttributeOpCode.NE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to compare Incoming requestAttribute :"
+ requestAttribute
+ " with requestAttribute value= " + requestAttribute
.getAttributeValue()
+ ", stored nodeAttribute value=" + nodeAttribute
.getAttributeValue());
LOG.debug("Incoming requestAttribute:" + requestAttribute
+ "is not present in " + schedulerNode.getNodeID()
+ ", however opcode is NE. Hence accept this node.");
}
if (requestAttribute.equals(nodeAttribute)) {
if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Incoming requestAttribute:" + requestAttribute
+ " matches with node:" + schedulerNode.getNodeID());
}
found = true;
return found;
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Incoming requestAttribute:" + requestAttribute
+ "is not present in " + schedulerNode.getNodeID()
+ ", skip such node.");
}
return false;
}
boolean found = false;
for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
.iterator(); it.hasNext();) {
NodeAttribute nodeAttribute = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to compare Incoming requestAttribute :"
+ requestAttribute
+ " with requestAttribute value= " + requestAttribute
.getAttributeValue()
+ ", stored nodeAttribute value=" + nodeAttribute
.getAttributeValue());
}
if (requestAttribute.equals(nodeAttribute)) {
if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Incoming requestAttribute:" + requestAttribute
+ " matches with node:" + schedulerNode.getNodeID());
}
found = true;
return found;
}
}
if (!found) {
if(LOG.isDebugEnabled()) {
LOG.info("skip this node:" + schedulerNode.getNodeID()
+ " for requestAttribute:" + requestAttribute);
}
return false;
}
if (!found) {
if (LOG.isDebugEnabled()) {
LOG.info("skip this node:" + schedulerNode.getNodeID()
+ " for requestAttribute:" + requestAttribute);
}
return false;
}
return true;
}
@ -217,7 +239,7 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
}
} else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
// This is a node attribute expression, check it.
if (!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp,
if (!canSatisfyNodeConstraintExpression(singleConstraint, currentExp,
schedulerNode)) {
return false;
}

View File

@ -22,16 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
@ -205,7 +196,14 @@ public static ApplicationId getMockApplicationId(int appId) {
ApplicationId applicationId = BuilderUtils.newApplicationId(0l, appId);
return ApplicationAttemptId.newInstance(applicationId, attemptId);
}
public static FiCaSchedulerNode getMockNodeWithAttributes(String host,
String rack, int port, int memory, Set<NodeAttribute> attributes) {
FiCaSchedulerNode node = getMockNode(host, rack, port, memory, 1);
when(node.getNodeAttributes()).thenReturn(attributes);
return node;
}
public static FiCaSchedulerNode getMockNode(String host, String rack,
int port, int memory) {
return getMockNode(host, rack, port, memory, 1);

View File

@ -18,14 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -44,6 +38,8 @@
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongBinaryOperator;
import static org.mockito.Matchers.any;
@ -326,4 +322,186 @@ public void testFunctionality() throws InvalidAllocationTagsQueryException {
Assert.assertFalse(allocator
.precheckNode(node2, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
}
@Test
public void testNodeAttributesFunctionality() {
// 1. Simple java=1.8 validation
SchedulingRequest schedulingRequest =
SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetNodeAttribute(PlacementConstraints.NODE,
NodeAttributeOpCode.EQ,
PlacementConstraints.PlacementTargets
.nodeAttribute("java", "1.8"),
PlacementConstraints.PlacementTargets.nodePartition(""))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build();
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest, false);
Set<NodeAttribute> attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
boolean result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertTrue("Allocation should be success for java=1.8", result);
// 2. verify python!=3 validation
SchedulingRequest schedulingRequest2 =
SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetNodeAttribute(PlacementConstraints.NODE,
NodeAttributeOpCode.NE,
PlacementConstraints.PlacementTargets
.nodeAttribute("python", "3"),
PlacementConstraints.PlacementTargets.nodePartition(""))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build();
// Create allocator
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
attributes = new HashSet<>();
result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertTrue("Allocation should be success as python doesn't exist",
result);
// 3. verify python!=3 validation when node has python=2
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("python", NodeAttributeType.STRING, "2"));
result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertTrue(
"Allocation should be success as python=3 doesn't exist in node",
result);
// 4. verify python!=3 validation when node has python=3
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertFalse("Allocation should fail as python=3 exist in node",
result);
}
@Test
public void testConjunctionNodeAttributesFunctionality() {
// 1. verify and(python!=3:java=1.8) validation when node has python=3
SchedulingRequest schedulingRequest1 =
SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(
PlacementConstraints.and(
PlacementConstraints
.targetNodeAttribute(PlacementConstraints.NODE,
NodeAttributeOpCode.NE,
PlacementConstraints.PlacementTargets
.nodeAttribute("python", "3")),
PlacementConstraints
.targetNodeAttribute(PlacementConstraints.NODE,
NodeAttributeOpCode.EQ,
PlacementConstraints.PlacementTargets
.nodeAttribute("java", "1.8")))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build();
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest1, false);
Set<NodeAttribute> attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
attributes.add(
NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
boolean result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertFalse("Allocation should fail as python=3 exists in node",
result);
// 2. verify and(python!=3:java=1.8) validation when node has python=2
// and java=1.8
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest1, false);
attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("python", NodeAttributeType.STRING, "2"));
attributes.add(
NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertTrue("Allocation should be success as python=2 exists in node",
result);
// 3. verify or(python!=3:java=1.8) validation when node has python=3
SchedulingRequest schedulingRequest2 =
SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(
PlacementConstraints.or(
PlacementConstraints
.targetNodeAttribute(PlacementConstraints.NODE,
NodeAttributeOpCode.NE,
PlacementConstraints.PlacementTargets
.nodeAttribute("python", "3")),
PlacementConstraints
.targetNodeAttribute(PlacementConstraints.NODE,
NodeAttributeOpCode.EQ,
PlacementConstraints.PlacementTargets
.nodeAttribute("java", "1.8")))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build();
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
attributes.add(
NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.8"));
result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert.assertTrue("Allocation should be success as java=1.8 exists in node",
result);
// 4. verify or(python!=3:java=1.8) validation when node has python=3
// and java=1.7.
allocator = new SingleConstraintAppPlacementAllocator();
allocator.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
allocator.updatePendingAsk(schedulerRequestKey, schedulingRequest2, false);
attributes = new HashSet<>();
attributes.add(
NodeAttribute.newInstance("python", NodeAttributeType.STRING, "3"));
attributes.add(
NodeAttribute.newInstance("java", NodeAttributeType.STRING, "1.7"));
result = allocator.canAllocate(NodeType.NODE_LOCAL,
TestUtils.getMockNodeWithAttributes("host1", "/rack1", 123, 1024,
attributes));
Assert
.assertFalse("Allocation should fail as java=1.8 doesnt exist in node",
result);
}
}