YARN-8367. Fix NPE in SingleConstraintAppPlacementAllocator when placement constraint in SchedulingRequest is null. Contributed by Weiwei Yang.

(Cherry picked from commit 6468071f13)
This commit is contained in:
Weiwei Yang 2018-05-31 20:46:39 +08:00
parent 58e7d6c3c0
commit ec4240a7fa
2 changed files with 183 additions and 90 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.collections.IteratorUtils; import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -238,110 +239,118 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
"Only GUARANTEED execution type is supported."); "Only GUARANTEED execution type is supported.");
} }
PlacementConstraint constraint = // Node partition
newSchedulingRequest.getPlacementConstraint();
// We only accept SingleConstraint
PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr();
if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
throwExceptionWithMetaInfo(
"Only accepts " + PlacementConstraint.SingleConstraint.class.getName()
+ " as constraint-expression. Rejecting the new added "
+ "constraint-expression.class=" + ac.getClass().getName());
}
PlacementConstraint.SingleConstraint singleConstraint =
(PlacementConstraint.SingleConstraint) ac;
// Make sure it is an anti-affinity request (actually this implementation
// should be able to support both affinity / anti-affinity without much
// effort. Considering potential test effort required. Limit to
// anti-affinity to intra-app and scope is node.
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
throwExceptionWithMetaInfo(
"Only support scope=" + PlacementConstraints.NODE
+ "now. PlacementConstraint=" + singleConstraint);
}
if (singleConstraint.getMinCardinality() != 0
|| singleConstraint.getMaxCardinality() != 0) {
throwExceptionWithMetaInfo(
"Only support anti-affinity, which is: minCardinality=0, "
+ "maxCardinality=1");
}
Set<PlacementConstraint.TargetExpression> targetExpressionSet =
singleConstraint.getTargetExpressions();
if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
throwExceptionWithMetaInfo(
"TargetExpression should not be null or empty");
}
// Set node partition
String nodePartition = null; String nodePartition = null;
// Target allocation tags // Target allocation tags
Set<String> targetAllocationTags = null; Set<String> targetAllocationTags = null;
for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) { PlacementConstraint constraint =
// Handle node partition newSchedulingRequest.getPlacementConstraint();
if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
// For node attribute target, we only support Partition now. And once
// YARN-3409 is merged, we will support node attribute.
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
throwExceptionWithMetaInfo("When TargetType="
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
}
if (nodePartition != null) { if (constraint != null) {
// This means we have duplicated node partition entry inside placement // We only accept SingleConstraint
// constraint, which might be set by mistake. PlacementConstraint.AbstractConstraint ac = constraint
throwExceptionWithMetaInfo( .getConstraintExpr();
"Only one node partition targetExpression is allowed"); if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
} throwExceptionWithMetaInfo("Only accepts "
+ PlacementConstraint.SingleConstraint.class.getName()
+ " as constraint-expression. Rejecting the new added "
+ "constraint-expression.class=" + ac.getClass().getName());
}
Set<String> values = targetExpression.getTargetValues(); PlacementConstraint.SingleConstraint singleConstraint =
if (values == null || values.isEmpty()) { (PlacementConstraint.SingleConstraint) ac;
nodePartition = RMNodeLabelsManager.NO_LABEL;
continue;
}
if (values.size() > 1) { // Make sure it is an anti-affinity request (actually this implementation
throwExceptionWithMetaInfo("Inside one targetExpression, we only " // should be able to support both affinity / anti-affinity without much
+ "support affinity to at most one node partition now"); // effort. Considering potential test effort required. Limit to
} // anti-affinity to intra-app and scope is node.
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
throwExceptionWithMetaInfo(
"Only support scope=" + PlacementConstraints.NODE
+ "now. PlacementConstraint=" + singleConstraint);
}
nodePartition = values.iterator().next(); if (singleConstraint.getMinCardinality() != 0
} else if (targetExpression.getTargetType().equals( || singleConstraint.getMaxCardinality() != 0) {
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) { throwExceptionWithMetaInfo(
// Handle allocation tags "Only support anti-affinity, which is: minCardinality=0, "
if (targetAllocationTags != null) { + "maxCardinality=1");
// This means we have duplicated AllocationTag expressions entries }
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one AllocationTag targetExpression is allowed");
}
if (targetExpression.getTargetValues() == null || targetExpression Set<PlacementConstraint.TargetExpression> targetExpressionSet =
.getTargetValues().isEmpty()) { singleConstraint.getTargetExpressions();
throwExceptionWithMetaInfo("Failed to find allocation tags from " if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
+ "TargetExpressions or couldn't find self-app target."); throwExceptionWithMetaInfo(
} "TargetExpression should not be null or empty");
}
targetAllocationTags = new HashSet<>( for (PlacementConstraint.TargetExpression targetExpression :
targetExpression.getTargetValues()); targetExpressionSet) {
// Handle node partition
if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
// For node attribute target, we only support Partition now. And once
// YARN-3409 is merged, we will support node attribute.
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
throwExceptionWithMetaInfo("When TargetType="
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
}
if (nodePartition != null) {
// This means we have duplicated node partition entry
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one node partition targetExpression is allowed");
}
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
nodePartition = RMNodeLabelsManager.NO_LABEL;
continue;
}
if (values.size() > 1) {
throwExceptionWithMetaInfo("Inside one targetExpression, we only "
+ "support affinity to at most one node partition now");
}
nodePartition = values.iterator().next();
} else if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
// Handle allocation tags
if (targetAllocationTags != null) {
// This means we have duplicated AllocationTag expressions entries
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one AllocationTag targetExpression is allowed");
}
if (targetExpression.getTargetValues() == null ||
targetExpression.getTargetValues().isEmpty()) {
throwExceptionWithMetaInfo("Failed to find allocation tags from "
+ "TargetExpressions or couldn't find self-app target.");
}
targetAllocationTags = new HashSet<>(
targetExpression.getTargetValues());
}
}
if (targetAllocationTags == null) {
// That means we don't have ALLOCATION_TAG specified
throwExceptionWithMetaInfo(
"Couldn't find target expression with type == ALLOCATION_TAG,"
+ " it is required to include one and only one target"
+ " expression with type == ALLOCATION_TAG");
} }
} }
// If this scheduling request doesn't contain a placement constraint,
// we set allocation tags an empty set.
if (targetAllocationTags == null) { if (targetAllocationTags == null) {
// That means we don't have ALLOCATION_TAG specified targetAllocationTags = ImmutableSet.of();
throwExceptionWithMetaInfo(
"Couldn't find target expression with type == ALLOCATION_TAG, it is "
+ "required to include one and only one target expression with "
+ "type == ALLOCATION_TAG");
} }
if (nodePartition == null) { if (nodePartition == null) {

View File

@ -18,8 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -39,6 +46,8 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*;
public class TestSchedulingRequestContainerAllocation { public class TestSchedulingRequestContainerAllocation {
private final int GB = 1024; private final int GB = 1024;
@ -393,4 +402,79 @@ public class TestSchedulingRequestContainerAllocation {
Assert.assertTrue(caughtException); Assert.assertTrue(caughtException);
rm1.close(); rm1.close();
} }
@Test
public void testSchedulingRequestWithNullConstraint() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
new Configuration());
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
// 4 NMs.
MockNM[] nms = new MockNM[4];
RMNode[] rmNodes = new RMNode[4];
for (int i = 0; i < 4; i++) {
nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
}
// app1 -> c
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
PlacementConstraint constraint = PlacementConstraints
.targetNotIn("node", allocationTag("t1"))
.build();
SchedulingRequest sc = SchedulingRequest
.newInstance(0, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ImmutableSet.of("t1"),
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
constraint);
AllocateRequest request = AllocateRequest.newBuilder()
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request);
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(2, schedApp.getLiveContainers().size());
// Send another request with null placement constraint,
// ensure there is no NPE while handling this request.
sc = SchedulingRequest
.newInstance(1, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ImmutableSet.of("t2"),
ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
null);
AllocateRequest request1 = AllocateRequest.newBuilder()
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request1);
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
Assert.assertEquals(4, schedApp.getLiveContainers().size());
rm1.close();
}
} }