YARN-1273. Fixed Distributed-shell to account for containers that failed to start. Contributed by Hitesh Shah.

svn merge --ignore-ancestry -c 1529389 ../../trunk


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-10-05 06:10:00 +00:00
parent c91addb743
commit 5c246b999c
5 changed files with 149 additions and 15 deletions

View File

@ -159,6 +159,9 @@ Release 2.1.2 - UNRELEASED
YARN-1254. Fixed NodeManager to not pollute container's credentials. (Omkar
Vinit Joshi via vinodkv)
YARN-1273. Fixed Distributed-shell to account for containers that failed
to start. (Hitesh Shah via vinodkv)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES

View File

@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@ -281,8 +282,8 @@ private void dumpOutDebugInfo() {
}
}
public ApplicationMaster() throws Exception {
// Set up the configuration and RPC
public ApplicationMaster() {
// Set up the configuration
conf = new YarnConfiguration();
}
@ -470,7 +471,7 @@ public boolean run() throws YarnException, IOException {
amRMClient.init(conf);
amRMClient.start();
containerListener = new NMCallbackHandler();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
@ -500,7 +501,6 @@ public boolean run() throws YarnException, IOException {
containerMemory = maxMem;
}
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
@ -513,7 +513,8 @@ public boolean run() throws YarnException, IOException {
}
numRequestedContainers.set(numTotalContainers);
while (!done) {
while (!done
&& (numCompletedContainers.get() != numTotalContainers)) {
try {
Thread.sleep(200);
} catch (InterruptedException ex) {}
@ -522,7 +523,12 @@ public boolean run() throws YarnException, IOException {
return success;
}
@VisibleForTesting
NMCallbackHandler createNMCallbackHandler() {
return new NMCallbackHandler(this);
}
private void finish() {
// Join all launched threads
// needed for when we time out
@ -566,7 +572,6 @@ private void finish() {
LOG.error("Failed to unregister application", e);
}
done = true;
amRMClient.stop();
}
@ -679,10 +684,17 @@ public void onError(Throwable e) {
}
}
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
@VisibleForTesting
static class NMCallbackHandler
implements NMClientAsync.CallbackHandler {
private ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<ContainerId, Container>();
private final ApplicationMaster applicationMaster;
public NMCallbackHandler(ApplicationMaster applicationMaster) {
this.applicationMaster = applicationMaster;
}
public void addContainer(ContainerId containerId, Container container) {
containers.putIfAbsent(containerId, container);
@ -713,7 +725,7 @@ public void onContainerStarted(ContainerId containerId,
}
Container container = containers.get(containerId);
if (container != null) {
nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
}
@ -721,6 +733,8 @@ public void onContainerStarted(ContainerId containerId,
public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId);
containers.remove(containerId);
applicationMaster.numCompletedContainers.incrementAndGet();
applicationMaster.numFailedContainers.incrementAndGet();
}
@Override
@ -847,7 +861,6 @@ public void run() {
/**
* Setup the request that will be sent to the RM for the container ask.
*
* @param numContainers Containers to ask for from RM
* @return the setup ResourceRequest to be sent to RM
*/
private ContainerRequest setupContainerAskForRM() {

View File

@ -125,8 +125,7 @@ public class Client {
// Application master jar file
private String appMasterJar = "";
// Main class to invoke application master
private final String appMasterMainClass =
"org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster";
private final String appMasterMainClass;
// Shell command to be executed
private String shellCommand = "";
@ -193,8 +192,14 @@ public static void main(String[] args) {
/**
*/
public Client(Configuration conf) throws Exception {
this(
"org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
conf);
}
Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
this.appMasterMainClass = appMasterMainClass;
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
opts = new Options();
@ -214,6 +219,7 @@ public Client(Configuration conf) throws Exception {
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
}
/**

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import java.nio.ByteBuffer;
import java.util.Map;
public class ContainerLaunchFailAppMaster extends ApplicationMaster {
private static final Log LOG =
LogFactory.getLog(ContainerLaunchFailAppMaster.class);
public ContainerLaunchFailAppMaster() {
super();
}
@Override
NMCallbackHandler createNMCallbackHandler() {
return new FailContainerLaunchNMCallbackHandler(this);
}
class FailContainerLaunchNMCallbackHandler
extends ApplicationMaster.NMCallbackHandler {
public FailContainerLaunchNMCallbackHandler(
ApplicationMaster applicationMaster) {
super(applicationMaster);
}
@Override
public void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse) {
super.onStartContainerError(containerId,
new RuntimeException("Inject Container Launch failure"));
}
}
public static void main(String[] args) {
boolean result = false;
try {
ContainerLaunchFailAppMaster appMaster =
new ContainerLaunchFailAppMaster();
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
result = appMaster.run();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Application Master failed. exiting");
System.exit(2);
}
}
}

View File

@ -59,7 +59,7 @@ public class TestDistributedShell {
protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class);
@BeforeClass
public static void setup() throws InterruptedException, Exception {
public static void setup() throws Exception {
LOG.info("Starting up YARN cluster");
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
@ -135,7 +135,7 @@ public void run() {
} catch (Exception e) {
throw new RuntimeException(e);
}
};
}
};
t.start();
@ -248,5 +248,34 @@ protected static void waitForNMToRegister(NodeManager nm)
Thread.sleep(2000);
}
}
@Test(timeout=90000)
public void testContainerLaunchFailureHandling() throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_memory",
"512",
"--container_memory",
"128"
};
LOG.info("Initializing DS Client");
Client client = new Client(ContainerLaunchFailAppMaster.class.getName(),
new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
Assert.assertFalse(result);
}
}