YARN-1303. Reverted the wrong patch committed earlier and committing the correct patch now. In one go.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1544029 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-21 04:09:46 +00:00
parent ad558cf2b3
commit a802ef4a5f
3 changed files with 31 additions and 26 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;
@ -219,6 +220,8 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env // Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh"; private final String ExecShellStringPath = "ExecShellScript.sh";
private final String shellCommandPath = "shellCommands";
private volatile boolean done; private volatile boolean done;
private volatile boolean success; private volatile boolean success;
@ -301,8 +304,6 @@ public class ApplicationMaster {
Options opts = new Options(); Options opts = new Options();
opts.addOption("app_attempt_id", true, opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes"); "App Attempt ID. Not to be used unless for testing purposes");
opts.addOption("shell_command", true,
"Shell command to be executed by the Application Master");
opts.addOption("shell_script", true, opts.addOption("shell_script", true,
"Location of the shell script to be executed"); "Location of the shell script to be executed");
opts.addOption("shell_args", true, "Command line args for the shell script"); opts.addOption("shell_args", true, "Command line args for the shell script");
@ -373,15 +374,15 @@ public class ApplicationMaster {
+ appAttemptID.getApplicationId().getClusterTimestamp() + appAttemptID.getApplicationId().getClusterTimestamp()
+ ", attemptId=" + appAttemptID.getAttemptId()); + ", attemptId=" + appAttemptID.getAttemptId());
if (!cliParser.hasOption("shell_command")) { File shellCommandFile = new File(shellCommandPath);
if (!shellCommandFile.exists()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"No shell command specified to be executed by application master"); "No shell command specified to be executed by application master");
} }
String shellCommandPath = cliParser.getOptionValue("shell_command");
FileInputStream fs = null; FileInputStream fs = null;
DataInputStream ds = null; DataInputStream ds = null;
try { try {
ds = new DataInputStream(new FileInputStream(shellCommandPath)); ds = new DataInputStream(new FileInputStream(shellCommandFile));
shellCommand = ds.readUTF(); shellCommand = ds.readUTF();
} finally { } finally {
org.apache.commons.io.IOUtils.closeQuietly(ds); org.apache.commons.io.IOUtils.closeQuietly(ds);

View File

@ -134,7 +134,6 @@ public class Client {
// Shell command to be executed // Shell command to be executed
private String shellCommand = ""; private String shellCommand = "";
private final String shellCommandPath = "shellCommands";
// Location of shell script // Location of shell script
private String shellScriptPath = ""; private String shellScriptPath = "";
// Args to be passed to the shell command // Args to be passed to the shell command
@ -166,6 +165,7 @@ public class Client {
// Command line options // Command line options
private Options opts; private Options opts;
private final String shellCommandPath = "shellCommands";
/** /**
* @param args Command line arguments * @param args Command line arguments
*/ */
@ -501,14 +501,12 @@ public class Client {
IOUtils.closeQuietly(ostream); IOUtils.closeQuietly(ostream);
} }
FileStatus scFileStatus = fs.getFileStatus(shellCommandDst); FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
LocalResource scRsrc = Records.newRecord(LocalResource.class); LocalResource scRsrc =
scRsrc.setType(LocalResourceType.FILE); LocalResource.newInstance(
scRsrc.setVisibility(LocalResourceVisibility.APPLICATION); ConverterUtils.getYarnUrlFromURI(shellCommandDst.toUri()),
scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
.toUri())); scFileStatus.getLen(), scFileStatus.getModificationTime());
scRsrc.setTimestamp(scFileStatus.getModificationTime()); localResources.put(shellCommandPath, scRsrc);
scRsrc.setSize(scFileStatus.getLen());
localResources.put("shellCommands", scRsrc);
} }
// Set local resource info into app master container launch context // Set local resource info into app master container launch context
amContainer.setLocalResources(localResources); amContainer.setLocalResources(localResources);
@ -569,9 +567,6 @@ public class Client {
vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority)); vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
vargs.add("--shell_command " + shellCommandPath + "");
}
if (!shellArgs.isEmpty()) { if (!shellArgs.isEmpty()) {
vargs.add("--shell_args " + shellArgs + ""); vargs.add("--shell_args " + shellArgs + "");
} }

View File

@ -175,13 +175,14 @@ public class TestDistributedShell {
@Test(timeout=90000) @Test(timeout=90000)
public void testDSShellWithCommands() throws Exception { public void testDSShellWithCommands() throws Exception {
String[] args = { String[] args = {
"--jar", "--jar",
APPMASTER_JAR, APPMASTER_JAR,
"--num_containers", "--num_containers",
"2", "2",
"--shell_command", "--shell_command",
"echo HADOOP YARN MAPREDUCE|wc -w", "\"echo output_ignored;echo output_expected\"",
"--master_memory", "--master_memory",
"512", "512",
"--master_vcores", "--master_vcores",
@ -201,8 +202,8 @@ public class TestDistributedShell {
boolean result = client.run(); boolean result = client.run();
LOG.info("Client run completed. Result=" + result); LOG.info("Client run completed. Result=" + result);
List<String> expectedContent = new ArrayList<String>(); List<String> expectedContent = new ArrayList<String>();
expectedContent.add("3"); expectedContent.add("output_expected");
verifyContainerLog(2, expectedContent); verifyContainerLog(2, expectedContent, false, "");
} }
@Test(timeout=90000) @Test(timeout=90000)
@ -368,8 +369,8 @@ public class TestDistributedShell {
Assert.assertTrue(client.run()); Assert.assertTrue(client.run());
} }
private void private int verifyContainerLog(int containerNum,
verifyContainerLog(int containerNum, List<String> expectedContent) { List<String> expectedContent, boolean count, String expectedWord) {
File logFolder = File logFolder =
new File(yarnCluster.getNodeManager(0).getConfig() new File(yarnCluster.getNodeManager(0).getConfig()
.get(YarnConfiguration.NM_LOG_DIRS, .get(YarnConfiguration.NM_LOG_DIRS,
@ -387,9 +388,10 @@ public class TestDistributedShell {
File[] containerFiles = File[] containerFiles =
listOfFiles[currentContainerLogFileIndex].listFiles(); listOfFiles[currentContainerLogFileIndex].listFiles();
int numOfWords = 0;
for (int i = 0; i < containerFiles.length; i++) { for (int i = 0; i < containerFiles.length; i++) {
for (File output : containerFiles[i].listFiles()) { for (File output : containerFiles[i].listFiles()) {
if (output.getName().trim().equalsIgnoreCase("stdout")) { if (output.getName().trim().contains("stdout")) {
BufferedReader br = null; BufferedReader br = null;
try { try {
@ -398,10 +400,16 @@ public class TestDistributedShell {
br = new BufferedReader(new FileReader(output)); br = new BufferedReader(new FileReader(output));
int numOfline = 0; int numOfline = 0;
while ((sCurrentLine = br.readLine()) != null) { while ((sCurrentLine = br.readLine()) != null) {
if (count) {
if (sCurrentLine.contains(expectedWord)) {
numOfWords++;
}
} else if (output.getName().trim().equals("stdout")){
Assert.assertEquals("The current is" + sCurrentLine, Assert.assertEquals("The current is" + sCurrentLine,
expectedContent.get(numOfline), sCurrentLine.trim()); expectedContent.get(numOfline), sCurrentLine.trim());
numOfline++; numOfline++;
} }
}
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
@ -416,6 +424,7 @@ public class TestDistributedShell {
} }
} }
} }
return numOfWords;
} }
} }