diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 67b1b2bf5df..b51783465d6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -329,6 +329,9 @@ Release 2.1.0-beta - UNRELEASED YARN-789. Enable zero capabilities resource requests in fair scheduler. (tucu) + YARN-639. Modified Distributed Shell application to start using the new + NMClient library. (Zhijie Shen via vinodkv) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 31b83f62b27..0c39f7a6611 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.security.PrivilegedAction; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.CommandLine; @@ -42,9 +43,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -71,12 +69,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.AMRMClientAsync; +import org.apache.hadoop.yarn.client.NMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.Records; /** @@ -147,11 +143,15 @@ public class ApplicationMaster { // Configuration private Configuration conf; - // YARN RPC to communicate with the Resource Manager or Node Manager - private YarnRPC rpc; // Handle to communicate with the Resource Manager - private AMRMClientAsync resourceManager; + @SuppressWarnings("rawtypes") + private AMRMClientAsync resourceManager; + + // Handle to communicate with the Node Manager + private NMClientAsync nmClientAsync; + // Listen to process the response from the Node Manager + private NMCallbackHandler containerListener; // Application Attempt Id ( combination of attemptId and fail count ) private ApplicationAttemptId appAttemptID; @@ -273,7 +273,6 @@ public class ApplicationMaster { public ApplicationMaster() throws Exception { // Set up the configuration and RPC conf = new YarnConfiguration(); - rpc = YarnRPC.create(conf); } /** @@ -437,17 +436,20 @@ public class ApplicationMaster { * @throws YarnException * @throws IOException */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public boolean run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); - - resourceManager = new AMRMClientAsync(appAttemptID, - 1000, - allocListener); + resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener); resourceManager.init(conf); resourceManager.start(); + containerListener = new NMCallbackHandler(); + nmClientAsync = new NMClientAsync(containerListener); + nmClientAsync.init(conf); + nmClientAsync.start(); + // Setup local RPC Server to accept status requests directly from clients // TODO need to setup a protocol for client to be able to communicate to // the RPC server @@ -517,6 +519,10 @@ public class ApplicationMaster { } } + // When the application completes, it should stop all running containers + LOG.info("Application completed. Stopping running containers"); + nmClientAsync.stop(); + // When the application completes, it should send a finish application // signal to the RM LOG.info("Application completed. Signalling finish to RM"); @@ -548,6 +554,7 @@ public class ApplicationMaster { } private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { + @SuppressWarnings("unchecked") @Override public void onContainersCompleted(List completedContainers) { LOG.info("Got response from RM for container ask, completedCnt=" @@ -618,8 +625,8 @@ public class ApplicationMaster { // + ", containerToken" // +allocatedContainer.getContainerToken().getIdentifier().toString()); - LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable( - allocatedContainer); + LaunchContainerRunnable runnableLaunchContainer = + new LaunchContainerRunnable(allocatedContainer, containerListener); Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep @@ -652,6 +659,64 @@ public class ApplicationMaster { } } + private class NMCallbackHandler implements NMClientAsync.CallbackHandler { + + private ConcurrentMap containers = + new ConcurrentHashMap(); + + public void addContainer(ContainerId containerId, Container container) { + containers.putIfAbsent(containerId, container); + } + + @Override + public void onContainerStopped(ContainerId containerId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Succeeded to stop Container " + containerId); + } + containers.remove(containerId); + } + + @Override + public void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus) { + if (LOG.isDebugEnabled()) { + LOG.debug("Container Status: id=" + containerId + ", status=" + + containerStatus); + } + } + + @Override + public void onContainerStarted(ContainerId containerId, + Map allServiceResponse) { + if (LOG.isDebugEnabled()) { + LOG.debug("Succeeded to start Container " + containerId); + } + Container container = containers.get(containerId); + if (container != null) { + nmClientAsync.getContainerStatus(containerId, container.getNodeId(), + container.getContainerToken()); + } + } + + @Override + public void onStartContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to start Container " + containerId); + containers.remove(containerId); + } + + @Override + public void onGetContainerStatusError( + ContainerId containerId, Throwable t) { + LOG.error("Failed to query the status of Container " + containerId); + } + + @Override + public void onStopContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to stop Container " + containerId); + containers.remove(containerId); + } + } + /** * Thread to connect to the {@link ContainerManager} and launch the container * that will execute the shell command. @@ -660,40 +725,17 @@ public class ApplicationMaster { // Allocated container Container container; - // Handle to communicate with ContainerManager - ContainerManager cm; + + NMCallbackHandler containerListener; /** * @param lcontainer Allocated container + * @param containerListener Callback handler of the container */ - public LaunchContainerRunnable(Container lcontainer) { + public LaunchContainerRunnable( + Container lcontainer, NMCallbackHandler containerListener) { this.container = lcontainer; - } - - /** - * Helper function to connect to CM - */ - private void connectToCM() { - LOG.debug("Connecting to ContainerManager for containerid=" - + container.getId()); - String cmIpPortStr = container.getNodeId().getHost() + ":" - + container.getNodeId().getPort(); - final InetSocketAddress cmAddress = - NetUtils.createSocketAddr(cmIpPortStr); - LOG.info("Connecting to ContainerManager at " + cmIpPortStr); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(container.getId().toString()); - Token token = - ProtoUtils.convertFromProtoFormat(container.getContainerToken(), - cmAddress); - ugi.addToken(token); - this.cm = ugi.doAs(new PrivilegedAction() { - @Override - public ContainerManager run() { - return ((ContainerManager) rpc.getProxy(ContainerManager.class, - cmAddress, conf)); - } - }); + this.containerListener = containerListener; } @Override @@ -703,9 +745,6 @@ public class ApplicationMaster { * start request to the CM. */ public void run() { - // Connect to ContainerManager - connectToCM(); - LOG.info("Setting up container launch container for containerid=" + container.getId()); ContainerLaunchContext ctx = Records @@ -773,40 +812,8 @@ public class ApplicationMaster { commands.add(command.toString()); ctx.setCommands(commands); - StartContainerRequest startReq = Records - .newRecord(StartContainerRequest.class); - startReq.setContainerLaunchContext(ctx); - startReq.setContainerToken(container.getContainerToken()); - try { - cm.startContainer(startReq); - } catch (YarnException e) { - LOG.info("Start container failed for :" + ", containerId=" - + container.getId()); - e.printStackTrace(); - // TODO do we need to release this container? - } catch (IOException e) { - LOG.info("Start container failed for :" + ", containerId=" - + container.getId()); - e.printStackTrace(); - } - - // Get container status? - // Left commented out as the shell scripts are short lived - // and we are relying on the status for completed containers - // from RM to detect status - - // GetContainerStatusRequest statusReq = - // Records.newRecord(GetContainerStatusRequest.class); - // statusReq.setContainerId(container.getId()); - // GetContainerStatusResponse statusResp; - // try { - // statusResp = cm.getContainerStatus(statusReq); - // LOG.info("Container Status" - // + ", id=" + container.getId() - // + ", status=" +statusResp.getStatus()); - // } catch (YarnException e) { - // e.printStackTrace(); - // } + containerListener.addContainer(container.getId(), container); + nmClientAsync.startContainer(container, ctx); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 87267054f89..992691ebb9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -123,21 +123,76 @@ public class TestDistributedShell { } @Test(timeout=30000) - public void testDSShellWithNoArgs() throws Exception { - - String[] args = {}; + public void testDSShellWithInvalidArgs() throws Exception { + Client client = new Client(new Configuration(yarnCluster.getConfig())); LOG.info("Initializing DS Client with no args"); - Client client = new Client(new Configuration(yarnCluster.getConfig())); - boolean exceptionThrown = false; try { - boolean initSuccess = client.init(args); - Assert.assertTrue(initSuccess); + client.init(new String[]{}); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("No args")); } - catch (IllegalArgumentException e) { - exceptionThrown = true; + + LOG.info("Initializing DS Client with no jar file"); + try { + String[] args = { + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--container_memory", + "128" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("No jar")); + } + + LOG.info("Initializing DS Client with no shell command"); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--master_memory", + "512", + "--container_memory", + "128" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("No shell command")); + } + + LOG.info("Initializing DS Client with invalid no. of containers"); + try { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "-1", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--container_memory", + "128" + }; + client.init(args); + Assert.fail("Exception is expected"); + } catch (IllegalArgumentException e) { + Assert.assertTrue("The throw exception is not expected", + e.getMessage().contains("Invalid no. of containers")); } - Assert.assertTrue(exceptionThrown); } }