From 8f5ec0f5f8e44fd1d78cbb2a6934e7fd50b4933d Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 21 Nov 2013 03:47:27 +0000 Subject: [PATCH] YARN-1303. Fixed DistributedShell to not fail with multiple commands separated by a semi-colon as shell-command. Contributed by Xuan Gong. svn merge --ignore-ancestry -c 1544023 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1544024 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 14 ++- .../applications/distributedshell/Client.java | 30 ++++++- .../TestDistributedShell.java | 86 +++++++++++++++++++ 4 files changed, 130 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0a6eedf4740..8a7d2f04bf8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,9 @@ Release 2.3.0 - UNRELEASED YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit Daga via Sandy Ryza) + YARN-1303. Fixed DistributedShell to not fail with multiple commands separated + by a semi-colon as shell-command. (Xuan Gong via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 06d96cbd198..6cf3e2dfa18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -19,8 +19,9 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStreamReader; import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; @@ -376,7 +377,16 @@ public class ApplicationMaster { throw new IllegalArgumentException( "No shell command specified to be executed by application master"); } - shellCommand = cliParser.getOptionValue("shell_command"); + String shellCommandPath = cliParser.getOptionValue("shell_command"); + FileInputStream fs = null; + DataInputStream ds = null; + try { + ds = new DataInputStream(new FileInputStream(shellCommandPath)); + shellCommand = ds.readUTF(); + } finally { + org.apache.commons.io.IOUtils.closeQuietly(ds); + org.apache.commons.io.IOUtils.closeQuietly(fs); + } if (cliParser.hasOption("shell_args")) { shellArgs = cliParser.getOptionValue("shell_args"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 199a16d56f8..b40a0f936d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -32,14 +32,17 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -131,6 +134,7 @@ public class Client { // Shell command to be executed private String shellCommand = ""; + private final String shellCommandPath = "shellCommands"; // Location of shell script private String shellScriptPath = ""; // Args to be passed to the shell command @@ -483,6 +487,29 @@ public class Client { hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); } + if (!shellCommand.isEmpty()) { + String shellCommandSuffix = + appName + "/" + appId.getId() + "/" + shellCommandPath; + Path shellCommandDst = + new Path(fs.getHomeDirectory(), shellCommandSuffix); + FSDataOutputStream ostream = null; + try { + ostream = FileSystem + .create(fs, shellCommandDst, new FsPermission((short) 0710)); + ostream.writeUTF(shellCommand); + } finally { + IOUtils.closeQuietly(ostream); + } + FileStatus scFileStatus = fs.getFileStatus(shellCommandDst); + LocalResource scRsrc = Records.newRecord(LocalResource.class); + scRsrc.setType(LocalResourceType.FILE); + scRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst + .toUri())); + scRsrc.setTimestamp(scFileStatus.getModificationTime()); + scRsrc.setSize(scFileStatus.getLen()); + localResources.put("shellCommands", scRsrc); + } // Set local resource info into app master container launch context amContainer.setLocalResources(localResources); @@ -541,8 +568,9 @@ public class Client { vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); + if (!shellCommand.isEmpty()) { - vargs.add("--shell_command " + shellCommand + ""); + vargs.add("--shell_command " + shellCommandPath + ""); } if (!shellArgs.isEmpty()) { vargs.add("--shell_args " + shellArgs + ""); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index a8b546d752b..59148c70938 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.applications.distributedshell; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -170,6 +173,38 @@ public class TestDistributedShell { } + @Test(timeout=90000) + public void testDSShellWithCommands() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + "echo HADOOP YARN MAPREDUCE|wc -w", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1" + }; + + LOG.info("Initializing DS Client"); + final Client client = + new Client(new Configuration(yarnCluster.getConfig())); + boolean initSuccess = client.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + LOG.info("Client run completed. Result=" + result); + List expectedContent = new ArrayList(); + expectedContent.add("3"); + verifyContainerLog(2, expectedContent); + } + @Test(timeout=90000) public void testDSShellWithInvalidArgs() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); @@ -332,5 +367,56 @@ public class TestDistributedShell { LOG.info("Running DS Client"); Assert.assertTrue(client.run()); } + + private void + verifyContainerLog(int containerNum, List expectedContent) { + File logFolder = + new File(yarnCluster.getNodeManager(0).getConfig() + .get(YarnConfiguration.NM_LOG_DIRS, + YarnConfiguration.DEFAULT_NM_LOG_DIRS)); + + File[] listOfFiles = logFolder.listFiles(); + int currentContainerLogFileIndex = -1; + for (int i = listOfFiles.length - 1; i >= 0; i--) { + if (listOfFiles[i].listFiles().length == containerNum + 1) { + currentContainerLogFileIndex = i; + break; + } + } + Assert.assertTrue(currentContainerLogFileIndex != -1); + File[] containerFiles = + listOfFiles[currentContainerLogFileIndex].listFiles(); + + for (int i = 0; i < containerFiles.length; i++) { + for (File output : containerFiles[i].listFiles()) { + if (output.getName().trim().equalsIgnoreCase("stdout")) { + BufferedReader br = null; + try { + + String sCurrentLine; + + br = new BufferedReader(new FileReader(output)); + int numOfline = 0; + while ((sCurrentLine = br.readLine()) != null) { + Assert.assertEquals("The current is" + sCurrentLine, + expectedContent.get(numOfline), sCurrentLine.trim()); + numOfline++; + } + + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (br != null) + br.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + } + } + } + } + }