YARN-2607. Fixed issues in TestDistributedShell. Contributed by Wangda Tan.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-11-08 11:00:57 -08:00
parent 9a4e0d343e
commit 737d9284c1
3 changed files with 176 additions and 131 deletions

View File

@ -920,6 +920,8 @@ Release 2.6.0 - UNRELEASED
YARN-2826. Fixed user-groups mappings' refresh bug caused by YARN-2826.
(Wangda Tan via vinodkv)
YARN-2607. Fixed issues in TestDistributedShell. (Wangda Tan via vinodkv)
Release 2.5.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -30,9 +30,7 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -44,70 +42,37 @@
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
public class TestDistributedShell {
private static final Log LOG =
LogFactory.getLog(TestDistributedShell.class);
protected MiniYARNCluster yarnCluster = null;
private int numNodeManager = 1;
private YarnConfiguration conf = null;
protected MiniYARNCluster yarnCluster = null;
protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1;
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
private void initializeNodeLabels() throws IOException {
RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
// Setup node labels
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
Set<String> labels = new HashSet<String>();
labels.add("x");
labelsMgr.addToCluserNodeLabels(labels);
// Setup queue access to node labels
conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
"100");
conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
conf.set(
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
"100");
rmContext.getScheduler().reinitialize(conf, rmContext);
// Fetch node-ids from yarn cluster
NodeId[] nodeIds = new NodeId[numNodeManager];
for (int i = 0; i < numNodeManager; i++) {
NodeManager mgr = this.yarnCluster.getNodeManager(i);
nodeIds[i] = mgr.getNMContext().getNodeId();
}
// Set label x to NM[1]
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
}
@Before
public void setup() throws Exception {
setupInternal(NUM_NMS);
}
protected void setupInternal(int numNodeManager) throws Exception {
LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration();
@ -115,7 +80,6 @@ public void setup() throws Exception {
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
numNodeManager = 2;
if (yarnCluster == null) {
yarnCluster =
@ -127,9 +91,6 @@ public void setup() throws Exception {
waitForNMsToRegister();
// currently only capacity scheduler support node labels,
initializeNodeLabels();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
@ -807,7 +768,7 @@ protected void waitForNMsToRegister() throws Exception {
int sec = 60;
while (sec >= 0) {
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
>= numNodeManager) {
>= NUM_NMS) {
break;
}
Thread.sleep(1000);
@ -940,88 +901,5 @@ private int verifyContainerLog(int containerNum,
}
return numOfWords;
}
@Test(timeout=90000)
public void testDSShellWithNodeLabelExpression() throws Exception {
// Start NMContainerMonitor
NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon);
t.start();
// Submit a job which will sleep for 60 sec
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"4",
"--shell_command",
"sleep",
"--shell_args",
"15",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--node_label_expression",
"x"
};
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
t.interrupt();
// Check maximum number of containers on each NMs
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
// Check no container allocated on NM[0]
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
// Check there're some containers allocated on NM[1]
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
}
/**
* Monitor containers running on NMs
*/
private class NMContainerMonitor implements Runnable {
// The interval of milliseconds of sampling (500ms)
final static int SAMPLING_INTERVAL_MS = 500;
// The maximum number of containers running on each NMs
int[] maxRunningContainersOnNMs = new int[numNodeManager];
@Override
public void run() {
while (true) {
for (int i = 0; i < numNodeManager; i++) {
int nContainers =
yarnCluster.getNodeManager(i).getNMContext().getContainers()
.size();
if (nContainers > maxRunningContainersOnNMs[i]) {
maxRunningContainersOnNMs[i] = nContainers;
}
}
try {
Thread.sleep(SAMPLING_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
public int[] getMaxRunningContainersReport() {
return maxRunningContainersOnNMs;
}
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
public class TestDistributedShellWithNodeLabels {
private static final Log LOG =
LogFactory.getLog(TestDistributedShellWithNodeLabels.class);
static final int NUM_NMS = 2;
TestDistributedShell distShellTest;
@Before
public void setup() throws Exception {
distShellTest = new TestDistributedShell();
distShellTest.setupInternal(NUM_NMS);
}
private void initializeNodeLabels() throws IOException {
RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
// Setup node labels
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
Set<String> labels = new HashSet<String>();
labels.add("x");
labelsMgr.addToCluserNodeLabels(labels);
// Setup queue access to node labels
distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
"100");
distShellTest.conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
distShellTest.conf.set(
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
"100");
rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext);
// Fetch node-ids from yarn cluster
NodeId[] nodeIds = new NodeId[NUM_NMS];
for (int i = 0; i < NUM_NMS; i++) {
NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i);
nodeIds[i] = mgr.getNMContext().getNodeId();
}
// Set label x to NM[1]
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
}
@Test(timeout=90000)
public void testDSShellWithNodeLabelExpression() throws Exception {
initializeNodeLabels();
// Start NMContainerMonitor
NMContainerMonitor mon = new NMContainerMonitor();
Thread t = new Thread(mon);
t.start();
// Submit a job which will sleep for 60 sec
String[] args = {
"--jar",
TestDistributedShell.APPMASTER_JAR,
"--num_containers",
"4",
"--shell_command",
"sleep",
"--shell_args",
"15",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--node_label_expression",
"x"
};
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
t.interrupt();
// Check maximum number of containers on each NMs
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
// Check no container allocated on NM[0]
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
// Check there're some containers allocated on NM[1]
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
}
/**
* Monitor containers running on NMs
*/
class NMContainerMonitor implements Runnable {
// The interval of milliseconds of sampling (500ms)
final static int SAMPLING_INTERVAL_MS = 500;
// The maximum number of containers running on each NMs
int[] maxRunningContainersOnNMs = new int[NUM_NMS];
@Override
public void run() {
while (true) {
for (int i = 0; i < NUM_NMS; i++) {
int nContainers =
distShellTest.yarnCluster.getNodeManager(i).getNMContext()
.getContainers().size();
if (nContainers > maxRunningContainersOnNMs[i]) {
maxRunningContainersOnNMs[i] = nContainers;
}
}
try {
Thread.sleep(SAMPLING_INTERVAL_MS);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
public int[] getMaxRunningContainersReport() {
return maxRunningContainersOnNMs;
}
}
}