YARN-2607. Fixed issues in TestDistributedShell. Contributed by Wangda Tan.
(cherry picked from commit 737d9284c1
)
This commit is contained in:
parent
1880a5a7c3
commit
0a5d95f705
|
@ -893,6 +893,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2826. Fixed user-groups mappings' refresh bug caused by YARN-2826.
|
YARN-2826. Fixed user-groups mappings' refresh bug caused by YARN-2826.
|
||||||
(Wangda Tan via vinodkv)
|
(Wangda Tan via vinodkv)
|
||||||
|
|
||||||
|
YARN-2607. Fixed issues in TestDistributedShell. (Wangda Tan via vinodkv)
|
||||||
|
|
||||||
Release 2.5.2 - UNRELEASED
|
Release 2.5.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -30,9 +30,7 @@ import java.net.InetAddress;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -44,70 +42,37 @@ import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.JarFinder;
|
import org.apache.hadoop.util.JarFinder;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
|
|
||||||
public class TestDistributedShell {
|
public class TestDistributedShell {
|
||||||
|
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(TestDistributedShell.class);
|
LogFactory.getLog(TestDistributedShell.class);
|
||||||
|
|
||||||
protected MiniYARNCluster yarnCluster = null;
|
protected MiniYARNCluster yarnCluster = null;
|
||||||
private int numNodeManager = 1;
|
protected YarnConfiguration conf = null;
|
||||||
|
private static final int NUM_NMS = 1;
|
||||||
private YarnConfiguration conf = null;
|
|
||||||
|
|
||||||
protected final static String APPMASTER_JAR =
|
protected final static String APPMASTER_JAR =
|
||||||
JarFinder.getJar(ApplicationMaster.class);
|
JarFinder.getJar(ApplicationMaster.class);
|
||||||
|
|
||||||
private void initializeNodeLabels() throws IOException {
|
|
||||||
RMContext rmContext = yarnCluster.getResourceManager(0).getRMContext();
|
|
||||||
|
|
||||||
// Setup node labels
|
|
||||||
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
|
||||||
Set<String> labels = new HashSet<String>();
|
|
||||||
labels.add("x");
|
|
||||||
labelsMgr.addToCluserNodeLabels(labels);
|
|
||||||
|
|
||||||
// Setup queue access to node labels
|
|
||||||
conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
|
|
||||||
conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
|
|
||||||
"100");
|
|
||||||
conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
|
|
||||||
conf.set(
|
|
||||||
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
|
|
||||||
"100");
|
|
||||||
|
|
||||||
rmContext.getScheduler().reinitialize(conf, rmContext);
|
|
||||||
|
|
||||||
// Fetch node-ids from yarn cluster
|
|
||||||
NodeId[] nodeIds = new NodeId[numNodeManager];
|
|
||||||
for (int i = 0; i < numNodeManager; i++) {
|
|
||||||
NodeManager mgr = this.yarnCluster.getNodeManager(i);
|
|
||||||
nodeIds[i] = mgr.getNMContext().getNodeId();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set label x to NM[1]
|
|
||||||
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
|
setupInternal(NUM_NMS);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setupInternal(int numNodeManager) throws Exception {
|
||||||
|
|
||||||
LOG.info("Starting up YARN cluster");
|
LOG.info("Starting up YARN cluster");
|
||||||
|
|
||||||
conf = new YarnConfiguration();
|
conf = new YarnConfiguration();
|
||||||
|
@ -115,7 +80,6 @@ public class TestDistributedShell {
|
||||||
conf.set("yarn.log.dir", "target");
|
conf.set("yarn.log.dir", "target");
|
||||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
|
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
|
||||||
numNodeManager = 2;
|
|
||||||
|
|
||||||
if (yarnCluster == null) {
|
if (yarnCluster == null) {
|
||||||
yarnCluster =
|
yarnCluster =
|
||||||
|
@ -127,9 +91,6 @@ public class TestDistributedShell {
|
||||||
|
|
||||||
waitForNMsToRegister();
|
waitForNMsToRegister();
|
||||||
|
|
||||||
// currently only capacity scheduler support node labels,
|
|
||||||
initializeNodeLabels();
|
|
||||||
|
|
||||||
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
|
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
|
||||||
if (url == null) {
|
if (url == null) {
|
||||||
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
|
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
|
||||||
|
@ -807,7 +768,7 @@ public class TestDistributedShell {
|
||||||
int sec = 60;
|
int sec = 60;
|
||||||
while (sec >= 0) {
|
while (sec >= 0) {
|
||||||
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
|
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
|
||||||
>= numNodeManager) {
|
>= NUM_NMS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
@ -940,88 +901,5 @@ public class TestDistributedShell {
|
||||||
}
|
}
|
||||||
return numOfWords;
|
return numOfWords;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
|
||||||
public void testDSShellWithNodeLabelExpression() throws Exception {
|
|
||||||
// Start NMContainerMonitor
|
|
||||||
NMContainerMonitor mon = new NMContainerMonitor();
|
|
||||||
Thread t = new Thread(mon);
|
|
||||||
t.start();
|
|
||||||
|
|
||||||
// Submit a job which will sleep for 60 sec
|
|
||||||
String[] args = {
|
|
||||||
"--jar",
|
|
||||||
APPMASTER_JAR,
|
|
||||||
"--num_containers",
|
|
||||||
"4",
|
|
||||||
"--shell_command",
|
|
||||||
"sleep",
|
|
||||||
"--shell_args",
|
|
||||||
"15",
|
|
||||||
"--master_memory",
|
|
||||||
"512",
|
|
||||||
"--master_vcores",
|
|
||||||
"2",
|
|
||||||
"--container_memory",
|
|
||||||
"128",
|
|
||||||
"--container_vcores",
|
|
||||||
"1",
|
|
||||||
"--node_label_expression",
|
|
||||||
"x"
|
|
||||||
};
|
|
||||||
|
|
||||||
LOG.info("Initializing DS Client");
|
|
||||||
final Client client =
|
|
||||||
new Client(new Configuration(yarnCluster.getConfig()));
|
|
||||||
boolean initSuccess = client.init(args);
|
|
||||||
Assert.assertTrue(initSuccess);
|
|
||||||
LOG.info("Running DS Client");
|
|
||||||
boolean result = client.run();
|
|
||||||
LOG.info("Client run completed. Result=" + result);
|
|
||||||
|
|
||||||
t.interrupt();
|
|
||||||
|
|
||||||
// Check maximum number of containers on each NMs
|
|
||||||
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
|
|
||||||
// Check no container allocated on NM[0]
|
|
||||||
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
|
|
||||||
// Check there're some containers allocated on NM[1]
|
|
||||||
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Monitor containers running on NMs
|
|
||||||
*/
|
|
||||||
private class NMContainerMonitor implements Runnable {
|
|
||||||
// The interval of milliseconds of sampling (500ms)
|
|
||||||
final static int SAMPLING_INTERVAL_MS = 500;
|
|
||||||
|
|
||||||
// The maximum number of containers running on each NMs
|
|
||||||
int[] maxRunningContainersOnNMs = new int[numNodeManager];
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
while (true) {
|
|
||||||
for (int i = 0; i < numNodeManager; i++) {
|
|
||||||
int nContainers =
|
|
||||||
yarnCluster.getNodeManager(i).getNMContext().getContainers()
|
|
||||||
.size();
|
|
||||||
if (nContainers > maxRunningContainersOnNMs[i]) {
|
|
||||||
maxRunningContainersOnNMs[i] = nContainers;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(SAMPLING_INTERVAL_MS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int[] getMaxRunningContainersReport() {
|
|
||||||
return maxRunningContainersOnNMs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
public class TestDistributedShellWithNodeLabels {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestDistributedShellWithNodeLabels.class);
|
||||||
|
|
||||||
|
static final int NUM_NMS = 2;
|
||||||
|
TestDistributedShell distShellTest;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
distShellTest = new TestDistributedShell();
|
||||||
|
distShellTest.setupInternal(NUM_NMS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeNodeLabels() throws IOException {
|
||||||
|
RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
|
||||||
|
|
||||||
|
// Setup node labels
|
||||||
|
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
||||||
|
Set<String> labels = new HashSet<String>();
|
||||||
|
labels.add("x");
|
||||||
|
labelsMgr.addToCluserNodeLabels(labels);
|
||||||
|
|
||||||
|
// Setup queue access to node labels
|
||||||
|
distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
|
||||||
|
distShellTest.conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
|
||||||
|
"100");
|
||||||
|
distShellTest.conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
|
||||||
|
distShellTest.conf.set(
|
||||||
|
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
|
||||||
|
"100");
|
||||||
|
|
||||||
|
rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext);
|
||||||
|
|
||||||
|
// Fetch node-ids from yarn cluster
|
||||||
|
NodeId[] nodeIds = new NodeId[NUM_NMS];
|
||||||
|
for (int i = 0; i < NUM_NMS; i++) {
|
||||||
|
NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i);
|
||||||
|
nodeIds[i] = mgr.getNMContext().getNodeId();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set label x to NM[1]
|
||||||
|
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDSShellWithNodeLabelExpression() throws Exception {
|
||||||
|
initializeNodeLabels();
|
||||||
|
|
||||||
|
// Start NMContainerMonitor
|
||||||
|
NMContainerMonitor mon = new NMContainerMonitor();
|
||||||
|
Thread t = new Thread(mon);
|
||||||
|
t.start();
|
||||||
|
|
||||||
|
// Submit a job which will sleep for 60 sec
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
TestDistributedShell.APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"4",
|
||||||
|
"--shell_command",
|
||||||
|
"sleep",
|
||||||
|
"--shell_args",
|
||||||
|
"15",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--master_vcores",
|
||||||
|
"2",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--container_vcores",
|
||||||
|
"1",
|
||||||
|
"--node_label_expression",
|
||||||
|
"x"
|
||||||
|
};
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client");
|
||||||
|
final Client client =
|
||||||
|
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
|
||||||
|
boolean initSuccess = client.init(args);
|
||||||
|
Assert.assertTrue(initSuccess);
|
||||||
|
LOG.info("Running DS Client");
|
||||||
|
boolean result = client.run();
|
||||||
|
LOG.info("Client run completed. Result=" + result);
|
||||||
|
|
||||||
|
t.interrupt();
|
||||||
|
|
||||||
|
// Check maximum number of containers on each NMs
|
||||||
|
int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
|
||||||
|
// Check no container allocated on NM[0]
|
||||||
|
Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
|
||||||
|
// Check there're some containers allocated on NM[1]
|
||||||
|
Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Monitor containers running on NMs
|
||||||
|
*/
|
||||||
|
class NMContainerMonitor implements Runnable {
|
||||||
|
// The interval of milliseconds of sampling (500ms)
|
||||||
|
final static int SAMPLING_INTERVAL_MS = 500;
|
||||||
|
|
||||||
|
// The maximum number of containers running on each NMs
|
||||||
|
int[] maxRunningContainersOnNMs = new int[NUM_NMS];
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
for (int i = 0; i < NUM_NMS; i++) {
|
||||||
|
int nContainers =
|
||||||
|
distShellTest.yarnCluster.getNodeManager(i).getNMContext()
|
||||||
|
.getContainers().size();
|
||||||
|
if (nContainers > maxRunningContainersOnNMs[i]) {
|
||||||
|
maxRunningContainersOnNMs[i] = nContainers;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(SAMPLING_INTERVAL_MS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int[] getMaxRunningContainersReport() {
|
||||||
|
return maxRunningContainersOnNMs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue