From 4d4968646a36b3b51b0b95e5a55a4a4e38c46f0d Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 11 Feb 2019 14:42:21 +0800 Subject: [PATCH] YARN-9252. Allocation Tag Namespace support in Distributed Shell. Contributed by Prabhu Joseph. (cherry picked from commit 2b7f828d4646e1837ef81e127ba86c6d77128e9b) --- .../constraint/PlacementConstraintParser.java | 63 +++++--- .../TestPlacementConstraintParser.java | 79 ++++++++-- .../TestDSWithMultipleNodeManager.java | 145 ++++++++++++++++++ 3 files changed, 253 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java index de9419ae619..db7c8b502e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java @@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.util.constraint; import com.google.common.base.Strings; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets; import java.util.ArrayList; import java.util.Map; @@ -35,6 +38,7 @@ import java.util.List; import java.util.Stack; import java.util.Set; import java.util.TreeSet; +import java.util.HashSet; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -51,6 +55,7 @@ public final class PlacementConstraintParser { private static final char KV_SPLIT_DELIM = '='; private static final char BRACKET_START = '('; private static final char BRACKET_END = ')'; + private static final char NAMESPACE_DELIM = '/'; private static final String KV_NE_DELIM = "!="; private static final String IN = "in"; private static final String NOT_IN = "notin"; @@ -104,6 +109,25 @@ public final class PlacementConstraintParser { } } + TargetExpression parseNameSpace(String targetTag) + throws PlacementConstraintParseException { + int i = targetTag.lastIndexOf(NAMESPACE_DELIM); + if (i != -1) { + String namespace = targetTag.substring(0, i); + for (AllocationTagNamespaceType type : + AllocationTagNamespaceType.values()) { + if (type.getTypeKeyword().equals(namespace)) { + return PlacementTargets.allocationTagWithNamespace( + namespace, targetTag.substring(i+1)); + } + } + throw new PlacementConstraintParseException( + "Invalid namespace prefix: " + namespace); + } else { + return PlacementTargets.allocationTag(targetTag); + } + } + String parseScope(String scopeString) throws PlacementConstraintParseException { if (scopeString.equalsIgnoreCase(SCOPE_NODE)) { @@ -392,12 +416,11 @@ public final class PlacementConstraintParser { + constraintEntities); } - PlacementConstraint.TargetExpression target = null; + TargetExpression target = null; if (!constraintEntities.isEmpty()) { - target = PlacementConstraints.PlacementTargets - .nodeAttribute(attributeName, - constraintEntities - .toArray(new String[constraintEntities.size()])); + target = PlacementTargets.nodeAttribute(attributeName, + constraintEntities + .toArray(new String[constraintEntities.size()])); } placementConstraints = PlacementConstraints @@ -457,23 +480,20 @@ public final class PlacementConstraintParser { String scope = nextToken(); scope = parseScope(scope); - Set constraintEntities = new TreeSet<>(); + Set targetExpressions = new HashSet<>(); while(hasMoreTokens()) { String tag = nextToken(); - constraintEntities.add(tag); - } - PlacementConstraint.TargetExpression target = null; - if(!constraintEntities.isEmpty()) { - target = PlacementConstraints.PlacementTargets.allocationTag( - constraintEntities - .toArray(new String[constraintEntities.size()])); + TargetExpression t = parseNameSpace(tag); + targetExpressions.add(t); } + TargetExpression[] targetArr = targetExpressions.toArray( + new TargetExpression[targetExpressions.size()]); if (op.equalsIgnoreCase(IN)) { placementConstraints = PlacementConstraints - .targetIn(scope, target); + .targetIn(scope, targetArr); } else { placementConstraints = PlacementConstraints - .targetNotIn(scope, target); + .targetNotIn(scope, targetArr); } } else { throw new PlacementConstraintParseException( @@ -527,13 +547,16 @@ public final class PlacementConstraintParser { String minCardinalityStr = resetElements.pop(); int min = toInt(minCardinalityStr); - ArrayList targetTags = new ArrayList<>(); + Set targetExpressions = new HashSet<>(); while (!resetElements.empty()) { - targetTags.add(resetElements.pop()); + String tag = resetElements.pop(); + TargetExpression t = parseNameSpace(tag); + targetExpressions.add(t); } + TargetExpression[] targetArr = targetExpressions.toArray( + new TargetExpression[targetExpressions.size()]); - return PlacementConstraints.cardinality(scope, min, max, - targetTags.toArray(new String[targetTags.size()])); + return PlacementConstraints.targetCardinality(scope, min, max, targetArr); } } @@ -744,4 +767,4 @@ public final class PlacementConstraintParser { } return result; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java index 91e4fdb4e84..f5fbc2632a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.Constrai import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; @@ -110,11 +109,13 @@ public class TestPlacementConstraintParser { Assert.assertEquals("node", single.getScope()); Assert.assertEquals(0, single.getMinCardinality()); Assert.assertEquals(0, single.getMaxCardinality()); - Assert.assertEquals(1, single.getTargetExpressions().size()); - TargetExpression exp = - single.getTargetExpressions().iterator().next(); - Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); - Assert.assertEquals(3, exp.getTargetValues().size()); + Assert.assertEquals(3, single.getTargetExpressions().size()); + Set expectedTargetExpressions = Sets.newHashSet( + PlacementTargets.allocationTag("foo"), + PlacementTargets.allocationTag("bar"), + PlacementTargets.allocationTag("exp")); + Assert.assertTrue(Sets.difference(expectedTargetExpressions, + single.getTargetExpressions()).isEmpty()); verifyConstraintToString(expressionStr, constraint); // Invalid OP @@ -161,13 +162,13 @@ public class TestPlacementConstraintParser { Assert.assertEquals("rack", single.getScope()); Assert.assertEquals(0, single.getMinCardinality()); Assert.assertEquals(1, single.getMaxCardinality()); - Assert.assertEquals(1, single.getTargetExpressions().size()); - exp = single.getTargetExpressions().iterator().next(); - Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString()); - Assert.assertEquals(3, exp.getTargetValues().size()); - Set expectedTags = Sets.newHashSet("foo", "bar", "moo"); - Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues()) - .isEmpty()); + Assert.assertEquals(3, single.getTargetExpressions().size()); + Set expectedTargetExpressions = Sets.newHashSet( + PlacementTargets.allocationTag("foo"), + PlacementTargets.allocationTag("bar"), + PlacementTargets.allocationTag("moo")); + Assert.assertTrue(Sets.difference(expectedTargetExpressions, + single.getTargetExpressions()).isEmpty()); verifyConstraintToString(expressionExpr, constraint); // Invalid scope string @@ -376,7 +377,11 @@ public class TestPlacementConstraintParser { tag1 = result.keySet().iterator().next(); Assert.assertEquals("foo", tag1.getTag()); Assert.assertEquals(10, tag1.getNumOfAllocations()); - expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build(); + TargetExpression[] targetExpressions = new TargetExpression[] { + PlacementTargets.allocationTag("foo"), + PlacementTargets.allocationTag("bar")}; + expectedPc1 = PlacementConstraints.targetCardinality("node", 0, + 100, targetExpressions).build(); Assert.assertEquals(expectedPc1, result.values().iterator().next()); // Two constraint expressions @@ -524,4 +529,50 @@ public class TestPlacementConstraintParser { Assert.assertTrue(e instanceof PlacementConstraintParseException); } } + + @Test + public void testParseAllocationTagNameSpace() + throws PlacementConstraintParseException { + Map result; + + // Constraint with Two Different NameSpaces + result = PlacementConstraintParser + .parsePlacementSpec("foo=2,notin,node,not-self/bar,all/moo"); + Assert.assertEquals(1, result.size()); + Set expectedTargetExpressions = Sets.newHashSet( + PlacementTargets.allocationTagWithNamespace("not-self", "bar"), + PlacementTargets.allocationTagWithNamespace("all", "moo")); + AbstractConstraint constraint = result.values().iterator().next(). + getConstraintExpr(); + Assert.assertTrue(constraint instanceof SingleConstraint); + SingleConstraint single = (SingleConstraint) constraint; + Assert.assertEquals(2, single.getTargetExpressions().size()); + Assert.assertTrue(Sets.difference(expectedTargetExpressions, + single.getTargetExpressions()).isEmpty()); + + // Constraint With Default NameSpace SELF + result = PlacementConstraintParser + .parsePlacementSpec("foo=2,notin,node,moo"); + Assert.assertEquals(1, result.size()); + TargetExpression expectedTargetExpression = PlacementTargets. + allocationTagWithNamespace("self", "moo"); + constraint = result.values().iterator().next().getConstraintExpr(); + Assert.assertTrue(constraint instanceof SingleConstraint); + single = (SingleConstraint) constraint; + Assert.assertEquals(1, single.getTargetExpressions().size()); + Assert.assertEquals(expectedTargetExpression, + single.getTargetExpressions().iterator().next()); + + // Constraint With Invalid NameSpace + boolean caughtException = false; + try { + result = PlacementConstraintParser + .parsePlacementSpec("foo=2,notin,node,bar/moo"); + } catch(PlacementConstraintParseException e) { + caughtException = true; + } + Assert.assertTrue("PlacementConstraintParseException is expected", + caughtException); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java index 698682a0332..7b940c881a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java @@ -19,18 +19,23 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.IOException; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.HashSet; import java.util.Set; +import java.util.Iterator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -52,6 +57,11 @@ public class TestDSWithMultipleNodeManager { distShellTest.setupInternal(NUM_NMS); } + @After + public void tearDown() throws Exception { + distShellTest.tearDown(); + } + private void initializeNodeLabels() throws IOException { RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); @@ -179,6 +189,141 @@ public class TestDSWithMultipleNodeManager { Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]); } + @Test(timeout = 90000) + public void testDistributedShellWithAllocationTagNamespace() + throws Exception { + NMContainerMonitor mon = new NMContainerMonitor(); + Thread monitorThread = new Thread(mon); + monitorThread.start(); + + String[] argsA = { + "--jar", + distShellTest.APPMASTER_JAR, + "--shell_command", + distShellTest.getSleepCommand(30), + "--placement_spec", + "bar=1,notin,node,bar" + }; + final Client clientA = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + clientA.init(argsA); + final AtomicBoolean resultA = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + resultA.set(clientA.run()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + NodeId masterContainerNodeIdA; + NodeId taskContainerNodeIdA; + ConcurrentMap apps; + RMApp appA; + + int expectedNM1Count = 0; + int expectedNM2Count = 0; + while (true) { + if ((expectedNM1Count + expectedNM2Count) < 2) { + expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0). + getNMContext().getContainers().size(); + expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1). + getNMContext().getContainers().size(); + continue; + } + apps = distShellTest.yarnCluster.getResourceManager().getRMContext(). + getRMApps(); + if (apps.isEmpty()) { + Thread.sleep(10); + continue; + } + appA = apps.values().iterator().next(); + if (appA.getAppAttempts().isEmpty()) { + Thread.sleep(10); + continue; + } + RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator(). + next(); + if (appAttemptA.getMasterContainer() == null) { + Thread.sleep(10); + continue; + } + masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId(); + break; + } + + NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext(). + getNodeId(); + NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext(). + getNodeId(); + Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count)); + + if (expectedNM1Count != expectedNM2Count) { + taskContainerNodeIdA = masterContainerNodeIdA; + } else { + taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB : + nodeA; + } + + String[] argsB = { + "--jar", + distShellTest.APPMASTER_JAR, + "1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--placement_spec", + "foo=3,notin,node,all/bar" + }; + final Client clientB = new Client(new Configuration(distShellTest. + yarnCluster.getConfig())); + clientB.init(argsB); + boolean resultB = clientB.run(); + Assert.assertTrue(resultB); + + monitorThread.interrupt(); + apps = distShellTest.yarnCluster.getResourceManager().getRMContext(). + getRMApps(); + Iterator it = apps.values().iterator(); + RMApp appB = it.next(); + if (appA.equals(appB)) { + appB = it.next(); + } + LOG.info("Allocation Tag NameSpace Applications are=" + appA. + getApplicationId() + " and " + appB.getApplicationId()); + + RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator(). + next(); + NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer(). + getNodeId(); + + if (nodeA.equals(masterContainerNodeIdB)) { + expectedNM1Count += 1; + } else { + expectedNM2Count += 1; + } + if (nodeA.equals(taskContainerNodeIdA)) { + expectedNM2Count += 3; + } else { + expectedNM1Count += 3; + } + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]); + Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]); + + try { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(distShellTest.yarnCluster. + getConfig())); + yarnClient.start(); + yarnClient.killApplication(appA.getApplicationId()); + } catch (Exception e) { + // Ignore Exception while killing a job + } + } + /** * Monitor containers running on NMs */