YARN-8350. NPE in service AM related to placement policy. Contributed by Gour Saha
(cherry picked from commit 778a4a24be
)
This commit is contained in:
parent
7bd5d79177
commit
58e7d6c3c0
|
@ -694,62 +694,66 @@ public class Component implements EventHandler<ComponentEvent> {
|
|||
// composite constraints then this AND-ed composite constraint is not
|
||||
// used.
|
||||
PlacementConstraint finalConstraint = null;
|
||||
for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
|
||||
yarnServiceConstraint : placementPolicy.getConstraints()) {
|
||||
List<TargetExpression> targetExpressions = new ArrayList<>();
|
||||
// Currently only intra-application allocation tags are supported.
|
||||
if (!yarnServiceConstraint.getTargetTags().isEmpty()) {
|
||||
targetExpressions.add(PlacementTargets.allocationTag(
|
||||
yarnServiceConstraint.getTargetTags().toArray(new String[0])));
|
||||
if (placementPolicy != null) {
|
||||
for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
|
||||
yarnServiceConstraint : placementPolicy.getConstraints()) {
|
||||
List<TargetExpression> targetExpressions = new ArrayList<>();
|
||||
// Currently only intra-application allocation tags are supported.
|
||||
if (!yarnServiceConstraint.getTargetTags().isEmpty()) {
|
||||
targetExpressions.add(PlacementTargets.allocationTag(
|
||||
yarnServiceConstraint.getTargetTags().toArray(new String[0])));
|
||||
}
|
||||
// Add all node attributes
|
||||
for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
|
||||
.getNodeAttributes().entrySet()) {
|
||||
targetExpressions
|
||||
.add(PlacementTargets.nodeAttribute(attribute.getKey(),
|
||||
attribute.getValue().toArray(new String[0])));
|
||||
}
|
||||
// Add all node partitions
|
||||
if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
|
||||
targetExpressions
|
||||
.add(PlacementTargets.nodePartition(yarnServiceConstraint
|
||||
.getNodePartitions().toArray(new String[0])));
|
||||
}
|
||||
PlacementConstraint constraint = null;
|
||||
switch (yarnServiceConstraint.getType()) {
|
||||
case AFFINITY:
|
||||
constraint = PlacementConstraints
|
||||
.targetIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
break;
|
||||
case ANTI_AFFINITY:
|
||||
constraint = PlacementConstraints
|
||||
.targetNotIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
break;
|
||||
case AFFINITY_WITH_CARDINALITY:
|
||||
constraint = PlacementConstraints.targetCardinality(
|
||||
yarnServiceConstraint.getScope().name().toLowerCase(),
|
||||
yarnServiceConstraint.getMinCardinality() == null ? 0
|
||||
: yarnServiceConstraint.getMinCardinality().intValue(),
|
||||
yarnServiceConstraint.getMaxCardinality() == null
|
||||
? Integer.MAX_VALUE
|
||||
: yarnServiceConstraint.getMaxCardinality().intValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0])).build();
|
||||
break;
|
||||
}
|
||||
// The default AND-ed final composite constraint
|
||||
if (finalConstraint != null) {
|
||||
finalConstraint = PlacementConstraints
|
||||
.and(constraint.getConstraintExpr(),
|
||||
finalConstraint.getConstraintExpr())
|
||||
.build();
|
||||
} else {
|
||||
finalConstraint = constraint;
|
||||
}
|
||||
LOG.debug("[COMPONENT {}] Placement constraint: {}",
|
||||
componentSpec.getName(),
|
||||
constraint.getConstraintExpr().toString());
|
||||
}
|
||||
// Add all node attributes
|
||||
for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
|
||||
.getNodeAttributes().entrySet()) {
|
||||
targetExpressions.add(PlacementTargets.nodeAttribute(
|
||||
attribute.getKey(), attribute.getValue().toArray(new String[0])));
|
||||
}
|
||||
// Add all node partitions
|
||||
if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
|
||||
targetExpressions
|
||||
.add(PlacementTargets.nodePartition(yarnServiceConstraint
|
||||
.getNodePartitions().toArray(new String[0])));
|
||||
}
|
||||
PlacementConstraint constraint = null;
|
||||
switch (yarnServiceConstraint.getType()) {
|
||||
case AFFINITY:
|
||||
constraint = PlacementConstraints
|
||||
.targetIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
break;
|
||||
case ANTI_AFFINITY:
|
||||
constraint = PlacementConstraints
|
||||
.targetNotIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
break;
|
||||
case AFFINITY_WITH_CARDINALITY:
|
||||
constraint = PlacementConstraints.targetCardinality(
|
||||
yarnServiceConstraint.getScope().name().toLowerCase(),
|
||||
yarnServiceConstraint.getMinCardinality() == null ? 0
|
||||
: yarnServiceConstraint.getMinCardinality().intValue(),
|
||||
yarnServiceConstraint.getMaxCardinality() == null
|
||||
? Integer.MAX_VALUE
|
||||
: yarnServiceConstraint.getMaxCardinality().intValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0])).build();
|
||||
break;
|
||||
}
|
||||
// The default AND-ed final composite constraint
|
||||
if (finalConstraint != null) {
|
||||
finalConstraint = PlacementConstraints
|
||||
.and(constraint.getConstraintExpr(),
|
||||
finalConstraint.getConstraintExpr())
|
||||
.build();
|
||||
} else {
|
||||
finalConstraint = constraint;
|
||||
}
|
||||
LOG.debug("[COMPONENT {}] Placement constraint: {}",
|
||||
componentSpec.getName(), constraint.getConstraintExpr().toString());
|
||||
}
|
||||
ResourceSizing resourceSizing = ResourceSizing.newInstance((int) count,
|
||||
resource);
|
||||
|
|
|
@ -91,6 +91,14 @@ public interface RestApiErrorMessages {
|
|||
|
||||
String ERROR_QUICKLINKS_FOR_COMP_INVALID = "Quicklinks specified at"
|
||||
+ " component level, needs corresponding values set at service level";
|
||||
// Note: %sin is not a typo. Constraint name is optional so the error messages
|
||||
// below handle that scenario by adding a space if name is specified.
|
||||
String ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL = "Type not specified "
|
||||
+ "for constraint %sin placement policy of component %s.";
|
||||
String ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL = "Scope not specified "
|
||||
+ "for constraint %sin placement policy of component %s.";
|
||||
String ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL = "Tag(s) not specified "
|
||||
+ "for constraint %sin placement policy of component %s.";
|
||||
String ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME = "Invalid target tag %s "
|
||||
+ "specified in placement policy of component %s. For now, target tags "
|
||||
+ "support self reference only. Specifying anything other than its "
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.service.api.records.Component;
|
|||
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
||||
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
|
||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
||||
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
|
||||
|
@ -314,9 +315,28 @@ public class ServiceApiUtil {
|
|||
private static void validatePlacementPolicy(List<Component> components,
|
||||
Set<String> componentNames) {
|
||||
for (Component comp : components) {
|
||||
if (comp.getPlacementPolicy() != null) {
|
||||
for (PlacementConstraint constraint : comp.getPlacementPolicy()
|
||||
PlacementPolicy placementPolicy = comp.getPlacementPolicy();
|
||||
if (placementPolicy != null) {
|
||||
for (PlacementConstraint constraint : placementPolicy
|
||||
.getConstraints()) {
|
||||
if (constraint.getType() == null) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
|
||||
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||
comp.getName()));
|
||||
}
|
||||
if (constraint.getScope() == null) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
|
||||
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||
comp.getName()));
|
||||
}
|
||||
if (constraint.getTargetTags().isEmpty()) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
|
||||
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||
comp.getName()));
|
||||
}
|
||||
for (String targetTag : constraint.getTargetTags()) {
|
||||
if (!comp.getName().equals(targetTag)) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.service.api.records.Component;
|
|||
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementScope;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementType;
|
||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
|
||||
|
@ -503,13 +505,48 @@ public class TestServiceApiUtil {
|
|||
PlacementPolicy pp = new PlacementPolicy();
|
||||
PlacementConstraint pc = new PlacementConstraint();
|
||||
pc.setName("CA1");
|
||||
pc.setTargetTags(Collections.singletonList("comp-invalid"));
|
||||
pp.setConstraints(Collections.singletonList(pc));
|
||||
comp.setPlacementPolicy(pp);
|
||||
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
Assert.fail(EXCEPTION_PREFIX + "service with empty placement");
|
||||
Assert.fail(EXCEPTION_PREFIX + "constraint with no type");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
|
||||
"CA1 ", "comp-a"), e.getMessage());
|
||||
}
|
||||
|
||||
// Set the type
|
||||
pc.setType(PlacementType.ANTI_AFFINITY);
|
||||
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
Assert.fail(EXCEPTION_PREFIX + "constraint with no scope");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
|
||||
"CA1 ", "comp-a"), e.getMessage());
|
||||
}
|
||||
|
||||
// Set the scope
|
||||
pc.setScope(PlacementScope.NODE);
|
||||
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
Assert.fail(EXCEPTION_PREFIX + "constraint with no tag(s)");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
|
||||
"CA1 ", "comp-a"), e.getMessage());
|
||||
}
|
||||
|
||||
// Set a target tag - but an invalid one
|
||||
pc.setTargetTags(Collections.singletonList("comp-invalid"));
|
||||
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag name");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals(
|
||||
String.format(
|
||||
|
@ -518,9 +555,10 @@ public class TestServiceApiUtil {
|
|||
e.getMessage());
|
||||
}
|
||||
|
||||
// Set valid target tags now
|
||||
pc.setTargetTags(Collections.singletonList("comp-a"));
|
||||
|
||||
// now it should succeed
|
||||
// Finally it should succeed
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
|
Loading…
Reference in New Issue