YARN-1303. Reverted the wrong patch committed earlier and committing the correct patch now. In one go.
svn merge --ignore-ancestry -c 1544029 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1544030 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8f5ec0f5f8
commit
83e907c32c
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
|
@ -219,6 +220,8 @@ public class ApplicationMaster {
|
||||||
// Hardcoded path to shell script in launch container's local env
|
// Hardcoded path to shell script in launch container's local env
|
||||||
private final String ExecShellStringPath = "ExecShellScript.sh";
|
private final String ExecShellStringPath = "ExecShellScript.sh";
|
||||||
|
|
||||||
|
private final String shellCommandPath = "shellCommands";
|
||||||
|
|
||||||
private volatile boolean done;
|
private volatile boolean done;
|
||||||
private volatile boolean success;
|
private volatile boolean success;
|
||||||
|
|
||||||
|
@ -301,8 +304,6 @@ public class ApplicationMaster {
|
||||||
Options opts = new Options();
|
Options opts = new Options();
|
||||||
opts.addOption("app_attempt_id", true,
|
opts.addOption("app_attempt_id", true,
|
||||||
"App Attempt ID. Not to be used unless for testing purposes");
|
"App Attempt ID. Not to be used unless for testing purposes");
|
||||||
opts.addOption("shell_command", true,
|
|
||||||
"Shell command to be executed by the Application Master");
|
|
||||||
opts.addOption("shell_script", true,
|
opts.addOption("shell_script", true,
|
||||||
"Location of the shell script to be executed");
|
"Location of the shell script to be executed");
|
||||||
opts.addOption("shell_args", true, "Command line args for the shell script");
|
opts.addOption("shell_args", true, "Command line args for the shell script");
|
||||||
|
@ -373,15 +374,15 @@ public class ApplicationMaster {
|
||||||
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
||||||
+ ", attemptId=" + appAttemptID.getAttemptId());
|
+ ", attemptId=" + appAttemptID.getAttemptId());
|
||||||
|
|
||||||
if (!cliParser.hasOption("shell_command")) {
|
File shellCommandFile = new File(shellCommandPath);
|
||||||
|
if (!shellCommandFile.exists()) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"No shell command specified to be executed by application master");
|
"No shell command specified to be executed by application master");
|
||||||
}
|
}
|
||||||
String shellCommandPath = cliParser.getOptionValue("shell_command");
|
|
||||||
FileInputStream fs = null;
|
FileInputStream fs = null;
|
||||||
DataInputStream ds = null;
|
DataInputStream ds = null;
|
||||||
try {
|
try {
|
||||||
ds = new DataInputStream(new FileInputStream(shellCommandPath));
|
ds = new DataInputStream(new FileInputStream(shellCommandFile));
|
||||||
shellCommand = ds.readUTF();
|
shellCommand = ds.readUTF();
|
||||||
} finally {
|
} finally {
|
||||||
org.apache.commons.io.IOUtils.closeQuietly(ds);
|
org.apache.commons.io.IOUtils.closeQuietly(ds);
|
||||||
|
|
|
@ -134,7 +134,6 @@ public class Client {
|
||||||
|
|
||||||
// Shell command to be executed
|
// Shell command to be executed
|
||||||
private String shellCommand = "";
|
private String shellCommand = "";
|
||||||
private final String shellCommandPath = "shellCommands";
|
|
||||||
// Location of shell script
|
// Location of shell script
|
||||||
private String shellScriptPath = "";
|
private String shellScriptPath = "";
|
||||||
// Args to be passed to the shell command
|
// Args to be passed to the shell command
|
||||||
|
@ -166,6 +165,7 @@ public class Client {
|
||||||
// Command line options
|
// Command line options
|
||||||
private Options opts;
|
private Options opts;
|
||||||
|
|
||||||
|
private final String shellCommandPath = "shellCommands";
|
||||||
/**
|
/**
|
||||||
* @param args Command line arguments
|
* @param args Command line arguments
|
||||||
*/
|
*/
|
||||||
|
@ -501,14 +501,12 @@ public class Client {
|
||||||
IOUtils.closeQuietly(ostream);
|
IOUtils.closeQuietly(ostream);
|
||||||
}
|
}
|
||||||
FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
|
FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
|
||||||
LocalResource scRsrc = Records.newRecord(LocalResource.class);
|
LocalResource scRsrc =
|
||||||
scRsrc.setType(LocalResourceType.FILE);
|
LocalResource.newInstance(
|
||||||
scRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
ConverterUtils.getYarnUrlFromURI(shellCommandDst.toUri()),
|
||||||
scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
|
||||||
.toUri()));
|
scFileStatus.getLen(), scFileStatus.getModificationTime());
|
||||||
scRsrc.setTimestamp(scFileStatus.getModificationTime());
|
localResources.put(shellCommandPath, scRsrc);
|
||||||
scRsrc.setSize(scFileStatus.getLen());
|
|
||||||
localResources.put("shellCommands", scRsrc);
|
|
||||||
}
|
}
|
||||||
// Set local resource info into app master container launch context
|
// Set local resource info into app master container launch context
|
||||||
amContainer.setLocalResources(localResources);
|
amContainer.setLocalResources(localResources);
|
||||||
|
@ -569,9 +567,6 @@ public class Client {
|
||||||
vargs.add("--num_containers " + String.valueOf(numContainers));
|
vargs.add("--num_containers " + String.valueOf(numContainers));
|
||||||
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
||||||
|
|
||||||
if (!shellCommand.isEmpty()) {
|
|
||||||
vargs.add("--shell_command " + shellCommandPath + "");
|
|
||||||
}
|
|
||||||
if (!shellArgs.isEmpty()) {
|
if (!shellArgs.isEmpty()) {
|
||||||
vargs.add("--shell_args " + shellArgs + "");
|
vargs.add("--shell_args " + shellArgs + "");
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,13 +175,14 @@ public class TestDistributedShell {
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
public void testDSShellWithCommands() throws Exception {
|
public void testDSShellWithCommands() throws Exception {
|
||||||
|
|
||||||
String[] args = {
|
String[] args = {
|
||||||
"--jar",
|
"--jar",
|
||||||
APPMASTER_JAR,
|
APPMASTER_JAR,
|
||||||
"--num_containers",
|
"--num_containers",
|
||||||
"2",
|
"2",
|
||||||
"--shell_command",
|
"--shell_command",
|
||||||
"echo HADOOP YARN MAPREDUCE|wc -w",
|
"\"echo output_ignored;echo output_expected\"",
|
||||||
"--master_memory",
|
"--master_memory",
|
||||||
"512",
|
"512",
|
||||||
"--master_vcores",
|
"--master_vcores",
|
||||||
|
@ -201,8 +202,8 @@ public class TestDistributedShell {
|
||||||
boolean result = client.run();
|
boolean result = client.run();
|
||||||
LOG.info("Client run completed. Result=" + result);
|
LOG.info("Client run completed. Result=" + result);
|
||||||
List<String> expectedContent = new ArrayList<String>();
|
List<String> expectedContent = new ArrayList<String>();
|
||||||
expectedContent.add("3");
|
expectedContent.add("output_expected");
|
||||||
verifyContainerLog(2, expectedContent);
|
verifyContainerLog(2, expectedContent, false, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
|
@ -368,8 +369,8 @@ public class TestDistributedShell {
|
||||||
Assert.assertTrue(client.run());
|
Assert.assertTrue(client.run());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void
|
private int verifyContainerLog(int containerNum,
|
||||||
verifyContainerLog(int containerNum, List<String> expectedContent) {
|
List<String> expectedContent, boolean count, String expectedWord) {
|
||||||
File logFolder =
|
File logFolder =
|
||||||
new File(yarnCluster.getNodeManager(0).getConfig()
|
new File(yarnCluster.getNodeManager(0).getConfig()
|
||||||
.get(YarnConfiguration.NM_LOG_DIRS,
|
.get(YarnConfiguration.NM_LOG_DIRS,
|
||||||
|
@ -387,9 +388,10 @@ public class TestDistributedShell {
|
||||||
File[] containerFiles =
|
File[] containerFiles =
|
||||||
listOfFiles[currentContainerLogFileIndex].listFiles();
|
listOfFiles[currentContainerLogFileIndex].listFiles();
|
||||||
|
|
||||||
|
int numOfWords = 0;
|
||||||
for (int i = 0; i < containerFiles.length; i++) {
|
for (int i = 0; i < containerFiles.length; i++) {
|
||||||
for (File output : containerFiles[i].listFiles()) {
|
for (File output : containerFiles[i].listFiles()) {
|
||||||
if (output.getName().trim().equalsIgnoreCase("stdout")) {
|
if (output.getName().trim().contains("stdout")) {
|
||||||
BufferedReader br = null;
|
BufferedReader br = null;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
@ -398,9 +400,15 @@ public class TestDistributedShell {
|
||||||
br = new BufferedReader(new FileReader(output));
|
br = new BufferedReader(new FileReader(output));
|
||||||
int numOfline = 0;
|
int numOfline = 0;
|
||||||
while ((sCurrentLine = br.readLine()) != null) {
|
while ((sCurrentLine = br.readLine()) != null) {
|
||||||
Assert.assertEquals("The current is" + sCurrentLine,
|
if (count) {
|
||||||
expectedContent.get(numOfline), sCurrentLine.trim());
|
if (sCurrentLine.contains(expectedWord)) {
|
||||||
numOfline++;
|
numOfWords++;
|
||||||
|
}
|
||||||
|
} else if (output.getName().trim().equals("stdout")){
|
||||||
|
Assert.assertEquals("The current is" + sCurrentLine,
|
||||||
|
expectedContent.get(numOfline), sCurrentLine.trim());
|
||||||
|
numOfline++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -416,6 +424,7 @@ public class TestDistributedShell {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return numOfWords;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue