YARN-9252. Allocation Tag Namespace support in Distributed Shell. Contributed by Prabhu Joseph.
(cherry picked from commit 2b7f828d46
)
This commit is contained in:
parent
9cfa5ce0c1
commit
4d4968646a
|
@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.util.constraint;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
|
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||||
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,6 +38,7 @@ import java.util.List;
|
||||||
import java.util.Stack;
|
import java.util.Stack;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -51,6 +55,7 @@ public final class PlacementConstraintParser {
|
||||||
private static final char KV_SPLIT_DELIM = '=';
|
private static final char KV_SPLIT_DELIM = '=';
|
||||||
private static final char BRACKET_START = '(';
|
private static final char BRACKET_START = '(';
|
||||||
private static final char BRACKET_END = ')';
|
private static final char BRACKET_END = ')';
|
||||||
|
private static final char NAMESPACE_DELIM = '/';
|
||||||
private static final String KV_NE_DELIM = "!=";
|
private static final String KV_NE_DELIM = "!=";
|
||||||
private static final String IN = "in";
|
private static final String IN = "in";
|
||||||
private static final String NOT_IN = "notin";
|
private static final String NOT_IN = "notin";
|
||||||
|
@ -104,6 +109,25 @@ public final class PlacementConstraintParser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TargetExpression parseNameSpace(String targetTag)
|
||||||
|
throws PlacementConstraintParseException {
|
||||||
|
int i = targetTag.lastIndexOf(NAMESPACE_DELIM);
|
||||||
|
if (i != -1) {
|
||||||
|
String namespace = targetTag.substring(0, i);
|
||||||
|
for (AllocationTagNamespaceType type :
|
||||||
|
AllocationTagNamespaceType.values()) {
|
||||||
|
if (type.getTypeKeyword().equals(namespace)) {
|
||||||
|
return PlacementTargets.allocationTagWithNamespace(
|
||||||
|
namespace, targetTag.substring(i+1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new PlacementConstraintParseException(
|
||||||
|
"Invalid namespace prefix: " + namespace);
|
||||||
|
} else {
|
||||||
|
return PlacementTargets.allocationTag(targetTag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
String parseScope(String scopeString)
|
String parseScope(String scopeString)
|
||||||
throws PlacementConstraintParseException {
|
throws PlacementConstraintParseException {
|
||||||
if (scopeString.equalsIgnoreCase(SCOPE_NODE)) {
|
if (scopeString.equalsIgnoreCase(SCOPE_NODE)) {
|
||||||
|
@ -392,12 +416,11 @@ public final class PlacementConstraintParser {
|
||||||
+ constraintEntities);
|
+ constraintEntities);
|
||||||
}
|
}
|
||||||
|
|
||||||
PlacementConstraint.TargetExpression target = null;
|
TargetExpression target = null;
|
||||||
if (!constraintEntities.isEmpty()) {
|
if (!constraintEntities.isEmpty()) {
|
||||||
target = PlacementConstraints.PlacementTargets
|
target = PlacementTargets.nodeAttribute(attributeName,
|
||||||
.nodeAttribute(attributeName,
|
constraintEntities
|
||||||
constraintEntities
|
.toArray(new String[constraintEntities.size()]));
|
||||||
.toArray(new String[constraintEntities.size()]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
placementConstraints = PlacementConstraints
|
placementConstraints = PlacementConstraints
|
||||||
|
@ -457,23 +480,20 @@ public final class PlacementConstraintParser {
|
||||||
String scope = nextToken();
|
String scope = nextToken();
|
||||||
scope = parseScope(scope);
|
scope = parseScope(scope);
|
||||||
|
|
||||||
Set<String> constraintEntities = new TreeSet<>();
|
Set<TargetExpression> targetExpressions = new HashSet<>();
|
||||||
while(hasMoreTokens()) {
|
while(hasMoreTokens()) {
|
||||||
String tag = nextToken();
|
String tag = nextToken();
|
||||||
constraintEntities.add(tag);
|
TargetExpression t = parseNameSpace(tag);
|
||||||
}
|
targetExpressions.add(t);
|
||||||
PlacementConstraint.TargetExpression target = null;
|
|
||||||
if(!constraintEntities.isEmpty()) {
|
|
||||||
target = PlacementConstraints.PlacementTargets.allocationTag(
|
|
||||||
constraintEntities
|
|
||||||
.toArray(new String[constraintEntities.size()]));
|
|
||||||
}
|
}
|
||||||
|
TargetExpression[] targetArr = targetExpressions.toArray(
|
||||||
|
new TargetExpression[targetExpressions.size()]);
|
||||||
if (op.equalsIgnoreCase(IN)) {
|
if (op.equalsIgnoreCase(IN)) {
|
||||||
placementConstraints = PlacementConstraints
|
placementConstraints = PlacementConstraints
|
||||||
.targetIn(scope, target);
|
.targetIn(scope, targetArr);
|
||||||
} else {
|
} else {
|
||||||
placementConstraints = PlacementConstraints
|
placementConstraints = PlacementConstraints
|
||||||
.targetNotIn(scope, target);
|
.targetNotIn(scope, targetArr);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new PlacementConstraintParseException(
|
throw new PlacementConstraintParseException(
|
||||||
|
@ -527,13 +547,16 @@ public final class PlacementConstraintParser {
|
||||||
String minCardinalityStr = resetElements.pop();
|
String minCardinalityStr = resetElements.pop();
|
||||||
int min = toInt(minCardinalityStr);
|
int min = toInt(minCardinalityStr);
|
||||||
|
|
||||||
ArrayList<String> targetTags = new ArrayList<>();
|
Set<TargetExpression> targetExpressions = new HashSet<>();
|
||||||
while (!resetElements.empty()) {
|
while (!resetElements.empty()) {
|
||||||
targetTags.add(resetElements.pop());
|
String tag = resetElements.pop();
|
||||||
|
TargetExpression t = parseNameSpace(tag);
|
||||||
|
targetExpressions.add(t);
|
||||||
}
|
}
|
||||||
|
TargetExpression[] targetArr = targetExpressions.toArray(
|
||||||
|
new TargetExpression[targetExpressions.size()]);
|
||||||
|
|
||||||
return PlacementConstraints.cardinality(scope, min, max,
|
return PlacementConstraints.targetCardinality(scope, min, max, targetArr);
|
||||||
targetTags.toArray(new String[targetTags.size()]));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.Constrai
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
|
||||||
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
|
||||||
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
|
|
||||||
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
|
||||||
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
|
||||||
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
|
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
|
||||||
|
@ -110,11 +109,13 @@ public class TestPlacementConstraintParser {
|
||||||
Assert.assertEquals("node", single.getScope());
|
Assert.assertEquals("node", single.getScope());
|
||||||
Assert.assertEquals(0, single.getMinCardinality());
|
Assert.assertEquals(0, single.getMinCardinality());
|
||||||
Assert.assertEquals(0, single.getMaxCardinality());
|
Assert.assertEquals(0, single.getMaxCardinality());
|
||||||
Assert.assertEquals(1, single.getTargetExpressions().size());
|
Assert.assertEquals(3, single.getTargetExpressions().size());
|
||||||
TargetExpression exp =
|
Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
|
||||||
single.getTargetExpressions().iterator().next();
|
PlacementTargets.allocationTag("foo"),
|
||||||
Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
|
PlacementTargets.allocationTag("bar"),
|
||||||
Assert.assertEquals(3, exp.getTargetValues().size());
|
PlacementTargets.allocationTag("exp"));
|
||||||
|
Assert.assertTrue(Sets.difference(expectedTargetExpressions,
|
||||||
|
single.getTargetExpressions()).isEmpty());
|
||||||
verifyConstraintToString(expressionStr, constraint);
|
verifyConstraintToString(expressionStr, constraint);
|
||||||
|
|
||||||
// Invalid OP
|
// Invalid OP
|
||||||
|
@ -161,13 +162,13 @@ public class TestPlacementConstraintParser {
|
||||||
Assert.assertEquals("rack", single.getScope());
|
Assert.assertEquals("rack", single.getScope());
|
||||||
Assert.assertEquals(0, single.getMinCardinality());
|
Assert.assertEquals(0, single.getMinCardinality());
|
||||||
Assert.assertEquals(1, single.getMaxCardinality());
|
Assert.assertEquals(1, single.getMaxCardinality());
|
||||||
Assert.assertEquals(1, single.getTargetExpressions().size());
|
Assert.assertEquals(3, single.getTargetExpressions().size());
|
||||||
exp = single.getTargetExpressions().iterator().next();
|
Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
|
||||||
Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
|
PlacementTargets.allocationTag("foo"),
|
||||||
Assert.assertEquals(3, exp.getTargetValues().size());
|
PlacementTargets.allocationTag("bar"),
|
||||||
Set<String> expectedTags = Sets.newHashSet("foo", "bar", "moo");
|
PlacementTargets.allocationTag("moo"));
|
||||||
Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues())
|
Assert.assertTrue(Sets.difference(expectedTargetExpressions,
|
||||||
.isEmpty());
|
single.getTargetExpressions()).isEmpty());
|
||||||
verifyConstraintToString(expressionExpr, constraint);
|
verifyConstraintToString(expressionExpr, constraint);
|
||||||
|
|
||||||
// Invalid scope string
|
// Invalid scope string
|
||||||
|
@ -376,7 +377,11 @@ public class TestPlacementConstraintParser {
|
||||||
tag1 = result.keySet().iterator().next();
|
tag1 = result.keySet().iterator().next();
|
||||||
Assert.assertEquals("foo", tag1.getTag());
|
Assert.assertEquals("foo", tag1.getTag());
|
||||||
Assert.assertEquals(10, tag1.getNumOfAllocations());
|
Assert.assertEquals(10, tag1.getNumOfAllocations());
|
||||||
expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build();
|
TargetExpression[] targetExpressions = new TargetExpression[] {
|
||||||
|
PlacementTargets.allocationTag("foo"),
|
||||||
|
PlacementTargets.allocationTag("bar")};
|
||||||
|
expectedPc1 = PlacementConstraints.targetCardinality("node", 0,
|
||||||
|
100, targetExpressions).build();
|
||||||
Assert.assertEquals(expectedPc1, result.values().iterator().next());
|
Assert.assertEquals(expectedPc1, result.values().iterator().next());
|
||||||
|
|
||||||
// Two constraint expressions
|
// Two constraint expressions
|
||||||
|
@ -524,4 +529,50 @@ public class TestPlacementConstraintParser {
|
||||||
Assert.assertTrue(e instanceof PlacementConstraintParseException);
|
Assert.assertTrue(e instanceof PlacementConstraintParseException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseAllocationTagNameSpace()
|
||||||
|
throws PlacementConstraintParseException {
|
||||||
|
Map<SourceTags, PlacementConstraint> result;
|
||||||
|
|
||||||
|
// Constraint with Two Different NameSpaces
|
||||||
|
result = PlacementConstraintParser
|
||||||
|
.parsePlacementSpec("foo=2,notin,node,not-self/bar,all/moo");
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
|
||||||
|
PlacementTargets.allocationTagWithNamespace("not-self", "bar"),
|
||||||
|
PlacementTargets.allocationTagWithNamespace("all", "moo"));
|
||||||
|
AbstractConstraint constraint = result.values().iterator().next().
|
||||||
|
getConstraintExpr();
|
||||||
|
Assert.assertTrue(constraint instanceof SingleConstraint);
|
||||||
|
SingleConstraint single = (SingleConstraint) constraint;
|
||||||
|
Assert.assertEquals(2, single.getTargetExpressions().size());
|
||||||
|
Assert.assertTrue(Sets.difference(expectedTargetExpressions,
|
||||||
|
single.getTargetExpressions()).isEmpty());
|
||||||
|
|
||||||
|
// Constraint With Default NameSpace SELF
|
||||||
|
result = PlacementConstraintParser
|
||||||
|
.parsePlacementSpec("foo=2,notin,node,moo");
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
TargetExpression expectedTargetExpression = PlacementTargets.
|
||||||
|
allocationTagWithNamespace("self", "moo");
|
||||||
|
constraint = result.values().iterator().next().getConstraintExpr();
|
||||||
|
Assert.assertTrue(constraint instanceof SingleConstraint);
|
||||||
|
single = (SingleConstraint) constraint;
|
||||||
|
Assert.assertEquals(1, single.getTargetExpressions().size());
|
||||||
|
Assert.assertEquals(expectedTargetExpression,
|
||||||
|
single.getTargetExpressions().iterator().next());
|
||||||
|
|
||||||
|
// Constraint With Invalid NameSpace
|
||||||
|
boolean caughtException = false;
|
||||||
|
try {
|
||||||
|
result = PlacementConstraintParser
|
||||||
|
.parsePlacementSpec("foo=2,notin,node,bar/moo");
|
||||||
|
} catch(PlacementConstraintParseException e) {
|
||||||
|
caughtException = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("PlacementConstraintParseException is expected",
|
||||||
|
caughtException);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,18 +19,23 @@
|
||||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -52,6 +57,11 @@ public class TestDSWithMultipleNodeManager {
|
||||||
distShellTest.setupInternal(NUM_NMS);
|
distShellTest.setupInternal(NUM_NMS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
distShellTest.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
private void initializeNodeLabels() throws IOException {
|
private void initializeNodeLabels() throws IOException {
|
||||||
RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
|
RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
|
||||||
|
|
||||||
|
@ -179,6 +189,141 @@ public class TestDSWithMultipleNodeManager {
|
||||||
Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
|
Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testDistributedShellWithAllocationTagNamespace()
|
||||||
|
throws Exception {
|
||||||
|
NMContainerMonitor mon = new NMContainerMonitor();
|
||||||
|
Thread monitorThread = new Thread(mon);
|
||||||
|
monitorThread.start();
|
||||||
|
|
||||||
|
String[] argsA = {
|
||||||
|
"--jar",
|
||||||
|
distShellTest.APPMASTER_JAR,
|
||||||
|
"--shell_command",
|
||||||
|
distShellTest.getSleepCommand(30),
|
||||||
|
"--placement_spec",
|
||||||
|
"bar=1,notin,node,bar"
|
||||||
|
};
|
||||||
|
final Client clientA =
|
||||||
|
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
|
||||||
|
clientA.init(argsA);
|
||||||
|
final AtomicBoolean resultA = new AtomicBoolean(false);
|
||||||
|
Thread t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
resultA.set(clientA.run());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
NodeId masterContainerNodeIdA;
|
||||||
|
NodeId taskContainerNodeIdA;
|
||||||
|
ConcurrentMap<ApplicationId, RMApp> apps;
|
||||||
|
RMApp appA;
|
||||||
|
|
||||||
|
int expectedNM1Count = 0;
|
||||||
|
int expectedNM2Count = 0;
|
||||||
|
while (true) {
|
||||||
|
if ((expectedNM1Count + expectedNM2Count) < 2) {
|
||||||
|
expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0).
|
||||||
|
getNMContext().getContainers().size();
|
||||||
|
expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1).
|
||||||
|
getNMContext().getContainers().size();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
|
||||||
|
getRMApps();
|
||||||
|
if (apps.isEmpty()) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
appA = apps.values().iterator().next();
|
||||||
|
if (appA.getAppAttempts().isEmpty()) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator().
|
||||||
|
next();
|
||||||
|
if (appAttemptA.getMasterContainer() == null) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext().
|
||||||
|
getNodeId();
|
||||||
|
NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext().
|
||||||
|
getNodeId();
|
||||||
|
Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count));
|
||||||
|
|
||||||
|
if (expectedNM1Count != expectedNM2Count) {
|
||||||
|
taskContainerNodeIdA = masterContainerNodeIdA;
|
||||||
|
} else {
|
||||||
|
taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB :
|
||||||
|
nodeA;
|
||||||
|
}
|
||||||
|
|
||||||
|
String[] argsB = {
|
||||||
|
"--jar",
|
||||||
|
distShellTest.APPMASTER_JAR,
|
||||||
|
"1",
|
||||||
|
"--shell_command",
|
||||||
|
Shell.WINDOWS ? "dir" : "ls",
|
||||||
|
"--placement_spec",
|
||||||
|
"foo=3,notin,node,all/bar"
|
||||||
|
};
|
||||||
|
final Client clientB = new Client(new Configuration(distShellTest.
|
||||||
|
yarnCluster.getConfig()));
|
||||||
|
clientB.init(argsB);
|
||||||
|
boolean resultB = clientB.run();
|
||||||
|
Assert.assertTrue(resultB);
|
||||||
|
|
||||||
|
monitorThread.interrupt();
|
||||||
|
apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
|
||||||
|
getRMApps();
|
||||||
|
Iterator<RMApp> it = apps.values().iterator();
|
||||||
|
RMApp appB = it.next();
|
||||||
|
if (appA.equals(appB)) {
|
||||||
|
appB = it.next();
|
||||||
|
}
|
||||||
|
LOG.info("Allocation Tag NameSpace Applications are=" + appA.
|
||||||
|
getApplicationId() + " and " + appB.getApplicationId());
|
||||||
|
|
||||||
|
RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator().
|
||||||
|
next();
|
||||||
|
NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer().
|
||||||
|
getNodeId();
|
||||||
|
|
||||||
|
if (nodeA.equals(masterContainerNodeIdB)) {
|
||||||
|
expectedNM1Count += 1;
|
||||||
|
} else {
|
||||||
|
expectedNM2Count += 1;
|
||||||
|
}
|
||||||
|
if (nodeA.equals(taskContainerNodeIdA)) {
|
||||||
|
expectedNM2Count += 3;
|
||||||
|
} else {
|
||||||
|
expectedNM1Count += 3;
|
||||||
|
}
|
||||||
|
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
|
||||||
|
Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
|
||||||
|
Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
|
||||||
|
|
||||||
|
try {
|
||||||
|
YarnClient yarnClient = YarnClient.createYarnClient();
|
||||||
|
yarnClient.init(new Configuration(distShellTest.yarnCluster.
|
||||||
|
getConfig()));
|
||||||
|
yarnClient.start();
|
||||||
|
yarnClient.killApplication(appA.getApplicationId());
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Ignore Exception while killing a job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Monitor containers running on NMs
|
* Monitor containers running on NMs
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue