diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index dae5f0ba006..0c0a61ed3e6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -159,6 +159,9 @@ Release 2.1.2 - UNRELEASED YARN-1254. Fixed NodeManager to not pollute container's credentials. (Omkar Vinit Joshi via vinodkv) + YARN-1273. Fixed Distributed-shell to account for containers that failed + to start. (Hitesh Shah via vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 959ba1c45f0..fa6eb9040d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -281,8 +282,8 @@ public class ApplicationMaster { } } - public ApplicationMaster() throws Exception { - // Set up the configuration and RPC + public ApplicationMaster() { + // Set up the configuration conf = new YarnConfiguration(); } @@ -470,7 +471,7 @@ public class ApplicationMaster { amRMClient.init(conf); amRMClient.start(); - containerListener = new NMCallbackHandler(); + containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start(); @@ -500,7 +501,6 @@ public class ApplicationMaster { containerMemory = maxMem; } - // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for @@ -513,7 +513,8 @@ public class ApplicationMaster { } numRequestedContainers.set(numTotalContainers); - while (!done) { + while (!done + && (numCompletedContainers.get() != numTotalContainers)) { try { Thread.sleep(200); } catch (InterruptedException ex) {} @@ -522,7 +523,12 @@ public class ApplicationMaster { return success; } - + + @VisibleForTesting + NMCallbackHandler createNMCallbackHandler() { + return new NMCallbackHandler(this); + } + private void finish() { // Join all launched threads // needed for when we time out @@ -566,7 +572,6 @@ public class ApplicationMaster { LOG.error("Failed to unregister application", e); } - done = true; amRMClient.stop(); } @@ -679,10 +684,17 @@ public class ApplicationMaster { } } - private class NMCallbackHandler implements NMClientAsync.CallbackHandler { + @VisibleForTesting + static class NMCallbackHandler + implements NMClientAsync.CallbackHandler { private ConcurrentMap containers = new ConcurrentHashMap(); + private final ApplicationMaster applicationMaster; + + public NMCallbackHandler(ApplicationMaster applicationMaster) { + this.applicationMaster = applicationMaster; + } public void addContainer(ContainerId containerId, Container container) { containers.putIfAbsent(containerId, container); @@ -713,7 +725,7 @@ public class ApplicationMaster { } Container container = containers.get(containerId); if (container != null) { - nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); + applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); } } @@ -721,6 +733,8 @@ public class ApplicationMaster { public void onStartContainerError(ContainerId containerId, Throwable t) { LOG.error("Failed to start Container " + containerId); containers.remove(containerId); + applicationMaster.numCompletedContainers.incrementAndGet(); + applicationMaster.numFailedContainers.incrementAndGet(); } @Override @@ -847,7 +861,6 @@ public class ApplicationMaster { /** * Setup the request that will be sent to the RM for the container ask. * - * @param numContainers Containers to ask for from RM * @return the setup ResourceRequest to be sent to RM */ private ContainerRequest setupContainerAskForRM() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 7d51a6783f5..01e030a6776 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -125,8 +125,7 @@ public class Client { // Application master jar file private String appMasterJar = ""; // Main class to invoke application master - private final String appMasterMainClass = - "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster"; + private final String appMasterMainClass; // Shell command to be executed private String shellCommand = ""; @@ -193,8 +192,14 @@ public class Client { /** */ public Client(Configuration conf) throws Exception { - + this( + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", + conf); + } + + Client(String appMasterMainClass, Configuration conf) { this.conf = conf; + this.appMasterMainClass = appMasterMainClass; yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); opts = new Options(); @@ -214,6 +219,7 @@ public class Client { opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); + } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java new file mode 100644 index 00000000000..2692fff5014 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class ContainerLaunchFailAppMaster extends ApplicationMaster { + + private static final Log LOG = + LogFactory.getLog(ContainerLaunchFailAppMaster.class); + + public ContainerLaunchFailAppMaster() { + super(); + } + + @Override + NMCallbackHandler createNMCallbackHandler() { + return new FailContainerLaunchNMCallbackHandler(this); + } + + class FailContainerLaunchNMCallbackHandler + extends ApplicationMaster.NMCallbackHandler { + + public FailContainerLaunchNMCallbackHandler( + ApplicationMaster applicationMaster) { + super(applicationMaster); + } + + @Override + public void onContainerStarted(ContainerId containerId, + Map allServiceResponse) { + super.onStartContainerError(containerId, + new RuntimeException("Inject Container Launch failure")); + } + + } + + public static void main(String[] args) { + boolean result = false; + try { + ContainerLaunchFailAppMaster appMaster = + new ContainerLaunchFailAppMaster(); + LOG.info("Initializing ApplicationMaster"); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + result = appMaster.run(); + } catch (Throwable t) { + LOG.fatal("Error running ApplicationMaster", t); + System.exit(1); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 7fbd2a6c9de..f8a41b7395b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -59,7 +59,7 @@ public class TestDistributedShell { protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class); @BeforeClass - public static void setup() throws InterruptedException, Exception { + public static void setup() throws Exception { LOG.info("Starting up YARN cluster"); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); conf.setClass(YarnConfiguration.RM_SCHEDULER, @@ -135,7 +135,7 @@ public class TestDistributedShell { } catch (Exception e) { throw new RuntimeException(e); } - }; + } }; t.start(); @@ -248,5 +248,34 @@ public class TestDistributedShell { Thread.sleep(2000); } } + + @Test(timeout=90000) + public void testContainerLaunchFailureHandling() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--container_memory", + "128" + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(ContainerLaunchFailAppMaster.class.getName(), + new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + Assert.assertFalse(result); + + } + }