YARN-1273. Fixed Distributed-shell to account for containers that failed to start. Contributed by Hitesh Shah.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529389 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0a887a0910
commit
be3edccf0a
|
@ -174,6 +174,9 @@ Release 2.1.2 - UNRELEASED
|
|||
YARN-1254. Fixed NodeManager to not pollute container's credentials. (Omkar
|
||||
Vinit Joshi via vinodkv)
|
||||
|
||||
YARN-1273. Fixed Distributed-shell to account for containers that failed
|
||||
to start. (Hitesh Shah via vinodkv)
|
||||
|
||||
Release 2.1.1-beta - 2013-09-23
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
|
@ -281,8 +282,8 @@ public class ApplicationMaster {
|
|||
}
|
||||
}
|
||||
|
||||
public ApplicationMaster() throws Exception {
|
||||
// Set up the configuration and RPC
|
||||
public ApplicationMaster() {
|
||||
// Set up the configuration
|
||||
conf = new YarnConfiguration();
|
||||
}
|
||||
|
||||
|
@ -470,7 +471,7 @@ public class ApplicationMaster {
|
|||
amRMClient.init(conf);
|
||||
amRMClient.start();
|
||||
|
||||
containerListener = new NMCallbackHandler();
|
||||
containerListener = createNMCallbackHandler();
|
||||
nmClientAsync = new NMClientAsyncImpl(containerListener);
|
||||
nmClientAsync.init(conf);
|
||||
nmClientAsync.start();
|
||||
|
@ -500,7 +501,6 @@ public class ApplicationMaster {
|
|||
containerMemory = maxMem;
|
||||
}
|
||||
|
||||
|
||||
// Setup ask for containers from RM
|
||||
// Send request for containers to RM
|
||||
// Until we get our fully allocated quota, we keep on polling RM for
|
||||
|
@ -513,7 +513,8 @@ public class ApplicationMaster {
|
|||
}
|
||||
numRequestedContainers.set(numTotalContainers);
|
||||
|
||||
while (!done) {
|
||||
while (!done
|
||||
&& (numCompletedContainers.get() != numTotalContainers)) {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException ex) {}
|
||||
|
@ -522,7 +523,12 @@ public class ApplicationMaster {
|
|||
|
||||
return success;
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
NMCallbackHandler createNMCallbackHandler() {
|
||||
return new NMCallbackHandler(this);
|
||||
}
|
||||
|
||||
private void finish() {
|
||||
// Join all launched threads
|
||||
// needed for when we time out
|
||||
|
@ -566,7 +572,6 @@ public class ApplicationMaster {
|
|||
LOG.error("Failed to unregister application", e);
|
||||
}
|
||||
|
||||
done = true;
|
||||
amRMClient.stop();
|
||||
}
|
||||
|
||||
|
@ -679,10 +684,17 @@ public class ApplicationMaster {
|
|||
}
|
||||
}
|
||||
|
||||
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
|
||||
@VisibleForTesting
|
||||
static class NMCallbackHandler
|
||||
implements NMClientAsync.CallbackHandler {
|
||||
|
||||
private ConcurrentMap<ContainerId, Container> containers =
|
||||
new ConcurrentHashMap<ContainerId, Container>();
|
||||
private final ApplicationMaster applicationMaster;
|
||||
|
||||
public NMCallbackHandler(ApplicationMaster applicationMaster) {
|
||||
this.applicationMaster = applicationMaster;
|
||||
}
|
||||
|
||||
public void addContainer(ContainerId containerId, Container container) {
|
||||
containers.putIfAbsent(containerId, container);
|
||||
|
@ -713,7 +725,7 @@ public class ApplicationMaster {
|
|||
}
|
||||
Container container = containers.get(containerId);
|
||||
if (container != null) {
|
||||
nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
||||
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -721,6 +733,8 @@ public class ApplicationMaster {
|
|||
public void onStartContainerError(ContainerId containerId, Throwable t) {
|
||||
LOG.error("Failed to start Container " + containerId);
|
||||
containers.remove(containerId);
|
||||
applicationMaster.numCompletedContainers.incrementAndGet();
|
||||
applicationMaster.numFailedContainers.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -847,7 +861,6 @@ public class ApplicationMaster {
|
|||
/**
|
||||
* Setup the request that will be sent to the RM for the container ask.
|
||||
*
|
||||
* @param numContainers Containers to ask for from RM
|
||||
* @return the setup ResourceRequest to be sent to RM
|
||||
*/
|
||||
private ContainerRequest setupContainerAskForRM() {
|
||||
|
|
|
@ -125,8 +125,7 @@ public class Client {
|
|||
// Application master jar file
|
||||
private String appMasterJar = "";
|
||||
// Main class to invoke application master
|
||||
private final String appMasterMainClass =
|
||||
"org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster";
|
||||
private final String appMasterMainClass;
|
||||
|
||||
// Shell command to be executed
|
||||
private String shellCommand = "";
|
||||
|
@ -193,8 +192,14 @@ public class Client {
|
|||
/**
|
||||
*/
|
||||
public Client(Configuration conf) throws Exception {
|
||||
|
||||
this(
|
||||
"org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
|
||||
conf);
|
||||
}
|
||||
|
||||
Client(String appMasterMainClass, Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.appMasterMainClass = appMasterMainClass;
|
||||
yarnClient = YarnClient.createYarnClient();
|
||||
yarnClient.init(conf);
|
||||
opts = new Options();
|
||||
|
@ -214,6 +219,7 @@ public class Client {
|
|||
opts.addOption("log_properties", true, "log4j.properties file");
|
||||
opts.addOption("debug", false, "Dump out debug information");
|
||||
opts.addOption("help", false, "Print usage");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
public class ContainerLaunchFailAppMaster extends ApplicationMaster {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(ContainerLaunchFailAppMaster.class);
|
||||
|
||||
public ContainerLaunchFailAppMaster() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
NMCallbackHandler createNMCallbackHandler() {
|
||||
return new FailContainerLaunchNMCallbackHandler(this);
|
||||
}
|
||||
|
||||
class FailContainerLaunchNMCallbackHandler
|
||||
extends ApplicationMaster.NMCallbackHandler {
|
||||
|
||||
public FailContainerLaunchNMCallbackHandler(
|
||||
ApplicationMaster applicationMaster) {
|
||||
super(applicationMaster);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContainerStarted(ContainerId containerId,
|
||||
Map<String, ByteBuffer> allServiceResponse) {
|
||||
super.onStartContainerError(containerId,
|
||||
new RuntimeException("Inject Container Launch failure"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
boolean result = false;
|
||||
try {
|
||||
ContainerLaunchFailAppMaster appMaster =
|
||||
new ContainerLaunchFailAppMaster();
|
||||
LOG.info("Initializing ApplicationMaster");
|
||||
boolean doRun = appMaster.init(args);
|
||||
if (!doRun) {
|
||||
System.exit(0);
|
||||
}
|
||||
result = appMaster.run();
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Error running ApplicationMaster", t);
|
||||
System.exit(1);
|
||||
}
|
||||
if (result) {
|
||||
LOG.info("Application Master completed successfully. exiting");
|
||||
System.exit(0);
|
||||
} else {
|
||||
LOG.info("Application Master failed. exiting");
|
||||
System.exit(2);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -59,7 +59,7 @@ public class TestDistributedShell {
|
|||
protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class);
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws InterruptedException, Exception {
|
||||
public static void setup() throws Exception {
|
||||
LOG.info("Starting up YARN cluster");
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER,
|
||||
|
@ -135,7 +135,7 @@ public class TestDistributedShell {
|
|||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
|
||||
|
@ -248,5 +248,34 @@ public class TestDistributedShell {
|
|||
Thread.sleep(2000);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=90000)
|
||||
public void testContainerLaunchFailureHandling() throws Exception {
|
||||
String[] args = {
|
||||
"--jar",
|
||||
APPMASTER_JAR,
|
||||
"--num_containers",
|
||||
"2",
|
||||
"--shell_command",
|
||||
Shell.WINDOWS ? "dir" : "ls",
|
||||
"--master_memory",
|
||||
"512",
|
||||
"--container_memory",
|
||||
"128"
|
||||
};
|
||||
|
||||
LOG.info("Initializing DS Client");
|
||||
Client client = new Client(ContainerLaunchFailAppMaster.class.getName(),
|
||||
new Configuration(yarnCluster.getConfig()));
|
||||
boolean initSuccess = client.init(args);
|
||||
Assert.assertTrue(initSuccess);
|
||||
LOG.info("Running DS Client");
|
||||
boolean result = client.run();
|
||||
|
||||
LOG.info("Client run completed. Result=" + result);
|
||||
Assert.assertFalse(result);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue