From 737d9284c109dac20ff423f30c62f6abe2db10f7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 8 Nov 2014 11:00:57 -0800 Subject: [PATCH] YARN-2607. Fixed issues in TestDistributedShell. Contributed by Wangda Tan. --- hadoop-yarn-project/CHANGES.txt | 2 + .../TestDistributedShell.java | 140 +-------------- .../TestDistributedShellWithNodeLabels.java | 165 ++++++++++++++++++ 3 files changed, 176 insertions(+), 131 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 748ffe078a7..9adfb8c100f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -920,6 +920,8 @@ Release 2.6.0 - UNRELEASED YARN-2826. Fixed user-groups mappings' refresh bug caused by YARN-2826. (Wangda Tan via vinodkv) + YARN-2607. Fixed issues in TestDistributedShell. (Wangda Tan via vinodkv) + Release 2.5.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index eb0fb947154..1d3a1040cf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -30,9 +30,7 @@ import java.net.InetAddress; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -44,70 +42,37 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.ImmutableMap; - public class TestDistributedShell { private static final Log LOG = LogFactory.getLog(TestDistributedShell.class); - protected MiniYARNCluster yarnCluster = null; - private int numNodeManager = 1; - - private YarnConfiguration conf = null; + protected MiniYARNCluster yarnCluster = null; + protected YarnConfiguration conf = null; + private static final int NUM_NMS = 1; protected final static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); - - private void initializeNodeLabels() throws IOException { - RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext(); - - // Setup node labels - RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); - Set labels = new HashSet(); - labels.add("x"); - labelsMgr.addToCluserNodeLabels(labels); - - // Setup queue access to node labels - conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x"); - conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity", - "100"); - conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x"); - conf.set( - "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity", - "100"); - - rmContext.getScheduler().reinitialize(conf, rmContext); - - // Fetch node-ids from yarn cluster - NodeId[] nodeIds = new NodeId[numNodeManager]; - for (int i = 0; i < numNodeManager; i++) { - NodeManager mgr = this.yarnCluster.getNodeManager(i); - nodeIds[i] = mgr.getNMContext().getNodeId(); - } - - // Set label x to NM[1] - labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); - } @Before public void setup() throws Exception { + setupInternal(NUM_NMS); + } + + protected void setupInternal(int numNodeManager) throws Exception { + LOG.info("Starting up YARN cluster"); conf = new YarnConfiguration(); @@ -115,7 +80,6 @@ public class TestDistributedShell { conf.set("yarn.log.dir", "target"); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); - numNodeManager = 2; if (yarnCluster == null) { yarnCluster = @@ -127,9 +91,6 @@ public class TestDistributedShell { waitForNMsToRegister(); - // currently only capacity scheduler support node labels, - initializeNodeLabels(); - URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); if (url == null) { throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath"); @@ -807,7 +768,7 @@ public class TestDistributedShell { int sec = 60; while (sec >= 0) { if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() - >= numNodeManager) { + >= NUM_NMS) { break; } Thread.sleep(1000); @@ -940,88 +901,5 @@ public class TestDistributedShell { } return numOfWords; } - - @Test(timeout=90000) - public void testDSShellWithNodeLabelExpression() throws Exception { - // Start NMContainerMonitor - NMContainerMonitor mon = new NMContainerMonitor(); - Thread t = new Thread(mon); - t.start(); - - // Submit a job which will sleep for 60 sec - String[] args = { - "--jar", - APPMASTER_JAR, - "--num_containers", - "4", - "--shell_command", - "sleep", - "--shell_args", - "15", - "--master_memory", - "512", - "--master_vcores", - "2", - "--container_memory", - "128", - "--container_vcores", - "1", - "--node_label_expression", - "x" - }; - - LOG.info("Initializing DS Client"); - final Client client = - new Client(new Configuration(yarnCluster.getConfig())); - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); - LOG.info("Running DS Client"); - boolean result = client.run(); - LOG.info("Client run completed. Result=" + result); - - t.interrupt(); - - // Check maximum number of containers on each NMs - int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); - // Check no container allocated on NM[0] - Assert.assertEquals(0, maxRunningContainersOnNMs[0]); - // Check there're some containers allocated on NM[1] - Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); - } - - /** - * Monitor containers running on NMs - */ - private class NMContainerMonitor implements Runnable { - // The interval of milliseconds of sampling (500ms) - final static int SAMPLING_INTERVAL_MS = 500; - - // The maximum number of containers running on each NMs - int[] maxRunningContainersOnNMs = new int[numNodeManager]; - - @Override - public void run() { - while (true) { - for (int i = 0; i < numNodeManager; i++) { - int nContainers = - yarnCluster.getNodeManager(i).getNMContext().getContainers() - .size(); - if (nContainers > maxRunningContainersOnNMs[i]) { - maxRunningContainersOnNMs[i] = nContainers; - } - } - try { - Thread.sleep(SAMPLING_INTERVAL_MS); - } catch (InterruptedException e) { - e.printStackTrace(); - break; - } - } - } - - public int[] getMaxRunningContainersReport() { - return maxRunningContainersOnNMs; - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java new file mode 100644 index 00000000000..c04b7fe2e22 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShellWithNodeLabels.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestDistributedShellWithNodeLabels { + private static final Log LOG = + LogFactory.getLog(TestDistributedShellWithNodeLabels.class); + + static final int NUM_NMS = 2; + TestDistributedShell distShellTest; + + @Before + public void setup() throws Exception { + distShellTest = new TestDistributedShell(); + distShellTest.setupInternal(NUM_NMS); + } + + private void initializeNodeLabels() throws IOException { + RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext(); + + // Setup node labels + RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); + Set labels = new HashSet(); + labels.add("x"); + labelsMgr.addToCluserNodeLabels(labels); + + // Setup queue access to node labels + distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x"); + distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity", + "100"); + distShellTest.conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x"); + distShellTest.conf.set( + "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity", + "100"); + + rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext); + + // Fetch node-ids from yarn cluster + NodeId[] nodeIds = new NodeId[NUM_NMS]; + for (int i = 0; i < NUM_NMS; i++) { + NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i); + nodeIds[i] = mgr.getNMContext().getNodeId(); + } + + // Set label x to NM[1] + labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels)); + } + + @Test(timeout=90000) + public void testDSShellWithNodeLabelExpression() throws Exception { + initializeNodeLabels(); + + // Start NMContainerMonitor + NMContainerMonitor mon = new NMContainerMonitor(); + Thread t = new Thread(mon); + t.start(); + + // Submit a job which will sleep for 60 sec + String[] args = { + "--jar", + TestDistributedShell.APPMASTER_JAR, + "--num_containers", + "4", + "--shell_command", + "sleep", + "--shell_args", + "15", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--node_label_expression", + "x" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(distShellTest.yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + + t.interrupt(); + + // Check maximum number of containers on each NMs + int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport(); + // Check no container allocated on NM[0] + Assert.assertEquals(0, maxRunningContainersOnNMs[0]); + // Check there're some containers allocated on NM[1] + Assert.assertTrue(maxRunningContainersOnNMs[1] > 0); + } + + /** + * Monitor containers running on NMs + */ + class NMContainerMonitor implements Runnable { + // The interval of milliseconds of sampling (500ms) + final static int SAMPLING_INTERVAL_MS = 500; + + // The maximum number of containers running on each NMs + int[] maxRunningContainersOnNMs = new int[NUM_NMS]; + + @Override + public void run() { + while (true) { + for (int i = 0; i < NUM_NMS; i++) { + int nContainers = + distShellTest.yarnCluster.getNodeManager(i).getNMContext() + .getContainers().size(); + if (nContainers > maxRunningContainersOnNMs[i]) { + maxRunningContainersOnNMs[i] = nContainers; + } + } + try { + Thread.sleep(SAMPLING_INTERVAL_MS); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + } + + public int[] getMaxRunningContainersReport() { + return maxRunningContainersOnNMs; + } + } +}