diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java similarity index 72% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java index e9ba6e8731e..698682a0332 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java @@ -18,12 +18,16 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.IOException; +import java.util.concurrent.ConcurrentMap; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; @@ -35,10 +39,10 @@ import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestDistributedShellWithNodeLabels { +public class TestDSWithMultipleNodeManager { private static final Logger LOG = - LoggerFactory.getLogger(TestDistributedShellWithNodeLabels.class); - + LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class); + static final int NUM_NMS = 2; TestDistributedShell distShellTest; @@ -47,7 +51,7 @@ public class TestDistributedShellWithNodeLabels { distShellTest = new TestDistributedShell(); distShellTest.setupInternal(NUM_NMS); } - + private void initializeNodeLabels() throws IOException { RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); @@ -77,11 +81,11 @@ public class TestDistributedShellWithNodeLabels { // Set label x to NM[1] labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); } - + @Test(timeout=90000) public void testDSShellWithNodeLabelExpression() throws Exception { initializeNodeLabels(); - + // Start NMContainerMonitor NMContainerMonitor mon = new NMContainerMonitor(); Thread t = new Thread(mon); @@ -117,9 +121,9 @@ public class TestDistributedShellWithNodeLabels { LOG.info("Running DS Client"); boolean result = client.run(); LOG.info("Client run completed. Result=" + result); - + t.interrupt(); - + // Check maximum number of containers on each NMs int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); // Check no container allocated on NM[0] @@ -127,7 +131,54 @@ public class TestDistributedShellWithNodeLabels { // Check there're some containers allocated on NM[1] Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); } - + + @Test(timeout = 90000) + public void testDistributedShellWithPlacementConstraint() + throws Exception { + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + String[] args = { + "--jar", + distShellTest.APPMASTER_JAR, + "1", + "--shell_command", + distShellTest.getSleepCommand(15), + "--placement_spec", + "zk=1,NOTIN,NODE,zk:spark=1,NOTIN,NODE,zk" + }; + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + ConcurrentMap apps = distShellTest.yarnCluster. + getResourceManager().getRMContext().getRMApps(); + RMApp app = apps.values().iterator().next(); + RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next(); + NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId(); + NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0); + + int expectedNM1Count = 1; + int expectedNM2Count = 1; + if (nm1.getNMContext().getNodeId().equals(masterNodeId)) { + expectedNM1Count++; + } else { + expectedNM2Count++; + } + + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]); + Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]); + } + /** * Monitor containers running on NMs */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 3bca1ca64f8..187d13bbd4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -178,7 +178,8 @@ public class TestDistributedShell { true); conf.setBoolean( YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); // ATS version specific settings if (timelineVersion == 1.0f) { conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); @@ -781,7 +782,7 @@ public class TestDistributedShell { } - private String getSleepCommand(int sec) { + protected String getSleepCommand(int sec) { // Windows doesn't have a sleep command, ping -n does the trick return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul" : "sleep " + sec;