YARN-8367. Fix NPE in SingleConstraintAppPlacementAllocator when placement constraint in SchedulingRequest is null. Contributed by Weiwei Yang.
This commit is contained in:
parent
d1e2b80980
commit
6468071f13
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.commons.collections.IteratorUtils;
|
import org.apache.commons.collections.IteratorUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -238,110 +239,118 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
|
||||||
"Only GUARANTEED execution type is supported.");
|
"Only GUARANTEED execution type is supported.");
|
||||||
}
|
}
|
||||||
|
|
||||||
PlacementConstraint constraint =
|
// Node partition
|
||||||
newSchedulingRequest.getPlacementConstraint();
|
|
||||||
|
|
||||||
// We only accept SingleConstraint
|
|
||||||
PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr();
|
|
||||||
if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
|
|
||||||
throwExceptionWithMetaInfo(
|
|
||||||
"Only accepts " + PlacementConstraint.SingleConstraint.class.getName()
|
|
||||||
+ " as constraint-expression. Rejecting the new added "
|
|
||||||
+ "constraint-expression.class=" + ac.getClass().getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
PlacementConstraint.SingleConstraint singleConstraint =
|
|
||||||
(PlacementConstraint.SingleConstraint) ac;
|
|
||||||
|
|
||||||
// Make sure it is an anti-affinity request (actually this implementation
|
|
||||||
// should be able to support both affinity / anti-affinity without much
|
|
||||||
// effort. Considering potential test effort required. Limit to
|
|
||||||
// anti-affinity to intra-app and scope is node.
|
|
||||||
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
|
|
||||||
throwExceptionWithMetaInfo(
|
|
||||||
"Only support scope=" + PlacementConstraints.NODE
|
|
||||||
+ "now. PlacementConstraint=" + singleConstraint);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (singleConstraint.getMinCardinality() != 0
|
|
||||||
|| singleConstraint.getMaxCardinality() != 0) {
|
|
||||||
throwExceptionWithMetaInfo(
|
|
||||||
"Only support anti-affinity, which is: minCardinality=0, "
|
|
||||||
+ "maxCardinality=1");
|
|
||||||
}
|
|
||||||
|
|
||||||
Set<PlacementConstraint.TargetExpression> targetExpressionSet =
|
|
||||||
singleConstraint.getTargetExpressions();
|
|
||||||
if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
|
|
||||||
throwExceptionWithMetaInfo(
|
|
||||||
"TargetExpression should not be null or empty");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set node partition
|
|
||||||
String nodePartition = null;
|
String nodePartition = null;
|
||||||
|
|
||||||
// Target allocation tags
|
// Target allocation tags
|
||||||
Set<String> targetAllocationTags = null;
|
Set<String> targetAllocationTags = null;
|
||||||
|
|
||||||
for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) {
|
PlacementConstraint constraint =
|
||||||
// Handle node partition
|
newSchedulingRequest.getPlacementConstraint();
|
||||||
if (targetExpression.getTargetType().equals(
|
|
||||||
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
|
|
||||||
// For node attribute target, we only support Partition now. And once
|
|
||||||
// YARN-3409 is merged, we will support node attribute.
|
|
||||||
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
|
|
||||||
throwExceptionWithMetaInfo("When TargetType="
|
|
||||||
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
|
|
||||||
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nodePartition != null) {
|
if (constraint != null) {
|
||||||
// This means we have duplicated node partition entry inside placement
|
// We only accept SingleConstraint
|
||||||
// constraint, which might be set by mistake.
|
PlacementConstraint.AbstractConstraint ac = constraint
|
||||||
throwExceptionWithMetaInfo(
|
.getConstraintExpr();
|
||||||
"Only one node partition targetExpression is allowed");
|
if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
|
||||||
}
|
throwExceptionWithMetaInfo("Only accepts "
|
||||||
|
+ PlacementConstraint.SingleConstraint.class.getName()
|
||||||
|
+ " as constraint-expression. Rejecting the new added "
|
||||||
|
+ "constraint-expression.class=" + ac.getClass().getName());
|
||||||
|
}
|
||||||
|
|
||||||
Set<String> values = targetExpression.getTargetValues();
|
PlacementConstraint.SingleConstraint singleConstraint =
|
||||||
if (values == null || values.isEmpty()) {
|
(PlacementConstraint.SingleConstraint) ac;
|
||||||
nodePartition = RMNodeLabelsManager.NO_LABEL;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (values.size() > 1) {
|
// Make sure it is an anti-affinity request (actually this implementation
|
||||||
throwExceptionWithMetaInfo("Inside one targetExpression, we only "
|
// should be able to support both affinity / anti-affinity without much
|
||||||
+ "support affinity to at most one node partition now");
|
// effort. Considering potential test effort required. Limit to
|
||||||
}
|
// anti-affinity to intra-app and scope is node.
|
||||||
|
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
|
||||||
|
throwExceptionWithMetaInfo(
|
||||||
|
"Only support scope=" + PlacementConstraints.NODE
|
||||||
|
+ "now. PlacementConstraint=" + singleConstraint);
|
||||||
|
}
|
||||||
|
|
||||||
nodePartition = values.iterator().next();
|
if (singleConstraint.getMinCardinality() != 0
|
||||||
} else if (targetExpression.getTargetType().equals(
|
|| singleConstraint.getMaxCardinality() != 0) {
|
||||||
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
|
throwExceptionWithMetaInfo(
|
||||||
// Handle allocation tags
|
"Only support anti-affinity, which is: minCardinality=0, "
|
||||||
if (targetAllocationTags != null) {
|
+ "maxCardinality=1");
|
||||||
// This means we have duplicated AllocationTag expressions entries
|
}
|
||||||
// inside placement constraint, which might be set by mistake.
|
|
||||||
throwExceptionWithMetaInfo(
|
|
||||||
"Only one AllocationTag targetExpression is allowed");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetExpression.getTargetValues() == null || targetExpression
|
Set<PlacementConstraint.TargetExpression> targetExpressionSet =
|
||||||
.getTargetValues().isEmpty()) {
|
singleConstraint.getTargetExpressions();
|
||||||
throwExceptionWithMetaInfo("Failed to find allocation tags from "
|
if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
|
||||||
+ "TargetExpressions or couldn't find self-app target.");
|
throwExceptionWithMetaInfo(
|
||||||
}
|
"TargetExpression should not be null or empty");
|
||||||
|
}
|
||||||
|
|
||||||
targetAllocationTags = new HashSet<>(
|
for (PlacementConstraint.TargetExpression targetExpression :
|
||||||
targetExpression.getTargetValues());
|
targetExpressionSet) {
|
||||||
|
// Handle node partition
|
||||||
|
if (targetExpression.getTargetType().equals(
|
||||||
|
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
|
||||||
|
// For node attribute target, we only support Partition now. And once
|
||||||
|
// YARN-3409 is merged, we will support node attribute.
|
||||||
|
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
|
||||||
|
throwExceptionWithMetaInfo("When TargetType="
|
||||||
|
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
|
||||||
|
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nodePartition != null) {
|
||||||
|
// This means we have duplicated node partition entry
|
||||||
|
// inside placement constraint, which might be set by mistake.
|
||||||
|
throwExceptionWithMetaInfo(
|
||||||
|
"Only one node partition targetExpression is allowed");
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> values = targetExpression.getTargetValues();
|
||||||
|
if (values == null || values.isEmpty()) {
|
||||||
|
nodePartition = RMNodeLabelsManager.NO_LABEL;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (values.size() > 1) {
|
||||||
|
throwExceptionWithMetaInfo("Inside one targetExpression, we only "
|
||||||
|
+ "support affinity to at most one node partition now");
|
||||||
|
}
|
||||||
|
|
||||||
|
nodePartition = values.iterator().next();
|
||||||
|
} else if (targetExpression.getTargetType().equals(
|
||||||
|
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
|
||||||
|
// Handle allocation tags
|
||||||
|
if (targetAllocationTags != null) {
|
||||||
|
// This means we have duplicated AllocationTag expressions entries
|
||||||
|
// inside placement constraint, which might be set by mistake.
|
||||||
|
throwExceptionWithMetaInfo(
|
||||||
|
"Only one AllocationTag targetExpression is allowed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (targetExpression.getTargetValues() == null ||
|
||||||
|
targetExpression.getTargetValues().isEmpty()) {
|
||||||
|
throwExceptionWithMetaInfo("Failed to find allocation tags from "
|
||||||
|
+ "TargetExpressions or couldn't find self-app target.");
|
||||||
|
}
|
||||||
|
|
||||||
|
targetAllocationTags = new HashSet<>(
|
||||||
|
targetExpression.getTargetValues());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (targetAllocationTags == null) {
|
||||||
|
// That means we don't have ALLOCATION_TAG specified
|
||||||
|
throwExceptionWithMetaInfo(
|
||||||
|
"Couldn't find target expression with type == ALLOCATION_TAG,"
|
||||||
|
+ " it is required to include one and only one target"
|
||||||
|
+ " expression with type == ALLOCATION_TAG");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this scheduling request doesn't contain a placement constraint,
|
||||||
|
// we set allocation tags an empty set.
|
||||||
if (targetAllocationTags == null) {
|
if (targetAllocationTags == null) {
|
||||||
// That means we don't have ALLOCATION_TAG specified
|
targetAllocationTags = ImmutableSet.of();
|
||||||
throwExceptionWithMetaInfo(
|
|
||||||
"Couldn't find target expression with type == ALLOCATION_TAG, it is "
|
|
||||||
+ "required to include one and only one target expression with "
|
|
||||||
+ "type == ALLOCATION_TAG");
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodePartition == null) {
|
if (nodePartition == null) {
|
||||||
|
|
|
@ -18,8 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -39,6 +46,8 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*;
|
||||||
|
|
||||||
public class TestSchedulingRequestContainerAllocation {
|
public class TestSchedulingRequestContainerAllocation {
|
||||||
private final int GB = 1024;
|
private final int GB = 1024;
|
||||||
|
|
||||||
|
@ -393,4 +402,79 @@ public class TestSchedulingRequestContainerAllocation {
|
||||||
Assert.assertTrue(caughtException);
|
Assert.assertTrue(caughtException);
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulingRequestWithNullConstraint() throws Exception {
|
||||||
|
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
||||||
|
new Configuration());
|
||||||
|
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
||||||
|
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||||
|
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm1 = new MockRM(csConf) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
// 4 NMs.
|
||||||
|
MockNM[] nms = new MockNM[4];
|
||||||
|
RMNode[] rmNodes = new RMNode[4];
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
|
||||||
|
rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// app1 -> c
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
|
||||||
|
PlacementConstraint constraint = PlacementConstraints
|
||||||
|
.targetNotIn("node", allocationTag("t1"))
|
||||||
|
.build();
|
||||||
|
SchedulingRequest sc = SchedulingRequest
|
||||||
|
.newInstance(0, Priority.newInstance(1),
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
|
||||||
|
ImmutableSet.of("t1"),
|
||||||
|
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
|
||||||
|
constraint);
|
||||||
|
AllocateRequest request = AllocateRequest.newBuilder()
|
||||||
|
.schedulingRequests(ImmutableList.of(sc)).build();
|
||||||
|
am1.allocate(request);
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
FiCaSchedulerApp schedApp = cs.getApplicationAttempt(
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
Assert.assertEquals(2, schedApp.getLiveContainers().size());
|
||||||
|
|
||||||
|
|
||||||
|
// Send another request with null placement constraint,
|
||||||
|
// ensure there is no NPE while handling this request.
|
||||||
|
sc = SchedulingRequest
|
||||||
|
.newInstance(1, Priority.newInstance(1),
|
||||||
|
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
|
||||||
|
ImmutableSet.of("t2"),
|
||||||
|
ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
|
||||||
|
null);
|
||||||
|
AllocateRequest request1 = AllocateRequest.newBuilder()
|
||||||
|
.schedulingRequests(ImmutableList.of(sc)).build();
|
||||||
|
am1.allocate(request1);
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(4, schedApp.getLiveContainers().size());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue