YARN-1435. Modified Distributed Shell to accept either the command or the custom script. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550867 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1182ca04d4
commit
d63cfdbf1a
@ -172,6 +172,9 @@ Release 2.4.0 - UNRELEASED
|
|||||||
service-address configuration are configured for every RM. (Xuan Gong via
|
service-address configuration are configured for every RM. (Xuan Gong via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-1435. Modified Distributed Shell to accept either the command or the
|
||||||
|
custom script. (Xuan Gong via zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -218,13 +218,14 @@ public class ApplicationMaster {
|
|||||||
private long shellScriptPathLen = 0;
|
private long shellScriptPathLen = 0;
|
||||||
|
|
||||||
// Hardcoded path to shell script in launch container's local env
|
// Hardcoded path to shell script in launch container's local env
|
||||||
private final String ExecShellStringPath = "ExecShellScript.sh";
|
private static final String ExecShellStringPath = "ExecShellScript.sh";
|
||||||
|
private static final String ExecBatScripStringtPath = "ExecBatScript.bat";
|
||||||
|
|
||||||
// Hardcoded path to custom log_properties
|
// Hardcoded path to custom log_properties
|
||||||
private final String log4jPath = "log4j.properties";
|
private static final String log4jPath = "log4j.properties";
|
||||||
|
|
||||||
private final String shellCommandPath = "shellCommands";
|
private static final String shellCommandPath = "shellCommands";
|
||||||
private final String shellArgsPath = "shellArgs";
|
private static final String shellArgsPath = "shellArgs";
|
||||||
|
|
||||||
private volatile boolean done;
|
private volatile boolean done;
|
||||||
private volatile boolean success;
|
private volatile boolean success;
|
||||||
@ -234,6 +235,9 @@ public class ApplicationMaster {
|
|||||||
// Launch threads
|
// Launch threads
|
||||||
private List<Thread> launchThreads = new ArrayList<Thread>();
|
private List<Thread> launchThreads = new ArrayList<Thread>();
|
||||||
|
|
||||||
|
private final String linux_bash_command = "bash";
|
||||||
|
private final String windows_command = "cmd /c";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args Command line args
|
* @param args Command line args
|
||||||
*/
|
*/
|
||||||
@ -308,8 +312,6 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
Options opts = new Options();
|
Options opts = new Options();
|
||||||
opts.addOption("app_attempt_id", true,
|
opts.addOption("app_attempt_id", true,
|
||||||
"App Attempt ID. Not to be used unless for testing purposes");
|
"App Attempt ID. Not to be used unless for testing purposes");
|
||||||
opts.addOption("shell_script", true,
|
|
||||||
"Location of the shell script to be executed");
|
|
||||||
opts.addOption("shell_env", true,
|
opts.addOption("shell_env", true,
|
||||||
"Environment for shell script. Specified as env_key=env_val pairs");
|
"Environment for shell script. Specified as env_key=env_val pairs");
|
||||||
opts.addOption("container_memory", true,
|
opts.addOption("container_memory", true,
|
||||||
@ -387,11 +389,15 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
||||||
+ ", attemptId=" + appAttemptID.getAttemptId());
|
+ ", attemptId=" + appAttemptID.getAttemptId());
|
||||||
|
|
||||||
if (!fileExist(shellCommandPath)) {
|
if (!fileExist(shellCommandPath)
|
||||||
|
&& envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"No shell command specified to be executed by application master");
|
"No shell command or shell script specified to be executed by application master");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fileExist(shellCommandPath)) {
|
||||||
|
shellCommand = readContent(shellCommandPath);
|
||||||
}
|
}
|
||||||
shellCommand = readContent(shellCommandPath);
|
|
||||||
|
|
||||||
if (fileExist(shellArgsPath)) {
|
if (fileExist(shellArgsPath)) {
|
||||||
shellArgs = readContent(shellArgsPath);
|
shellArgs = readContent(shellArgsPath);
|
||||||
@ -847,7 +853,9 @@ public void run() {
|
|||||||
}
|
}
|
||||||
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
||||||
shellRsrc.setSize(shellScriptPathLen);
|
shellRsrc.setSize(shellScriptPathLen);
|
||||||
localResources.put(ExecShellStringPath, shellRsrc);
|
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
|
||||||
|
ExecShellStringPath, shellRsrc);
|
||||||
|
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
|
||||||
}
|
}
|
||||||
ctx.setLocalResources(localResources);
|
ctx.setLocalResources(localResources);
|
||||||
|
|
||||||
@ -858,7 +866,8 @@ public void run() {
|
|||||||
vargs.add(shellCommand);
|
vargs.add(shellCommand);
|
||||||
// Set shell script path
|
// Set shell script path
|
||||||
if (!shellScriptPath.isEmpty()) {
|
if (!shellScriptPath.isEmpty()) {
|
||||||
vargs.add(ExecShellStringPath);
|
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
|
||||||
|
: ExecShellStringPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set args for the shell command if any
|
// Set args for the shell command if any
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
@ -167,11 +168,14 @@ public class Client {
|
|||||||
// Command line options
|
// Command line options
|
||||||
private Options opts;
|
private Options opts;
|
||||||
|
|
||||||
private final String shellCommandPath = "shellCommands";
|
private static final String shellCommandPath = "shellCommands";
|
||||||
private final String shellArgsPath = "shellArgs";
|
private static final String shellArgsPath = "shellArgs";
|
||||||
private final String appMasterJarPath = "AppMaster.jar";
|
private static final String appMasterJarPath = "AppMaster.jar";
|
||||||
// Hardcoded path to custom log_properties
|
// Hardcoded path to custom log_properties
|
||||||
private final String log4jPath = "log4j.properties";
|
private static final String log4jPath = "log4j.properties";
|
||||||
|
|
||||||
|
private static final String linuxShellPath = "ExecShellScript.sh";
|
||||||
|
private static final String windowBatPath = "ExecBatScript.bat";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args Command line arguments
|
* @param args Command line arguments
|
||||||
@ -225,8 +229,11 @@ public Client(Configuration conf) throws Exception {
|
|||||||
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
|
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
|
||||||
opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
|
opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
|
||||||
opts.addOption("jar", true, "Jar file containing the application master");
|
opts.addOption("jar", true, "Jar file containing the application master");
|
||||||
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
|
opts.addOption("shell_command", true, "Shell command to be executed by " +
|
||||||
opts.addOption("shell_script", true, "Location of the shell script to be executed");
|
"the Application Master. Can only specify either --shell_command " +
|
||||||
|
"or --shell_script");
|
||||||
|
opts.addOption("shell_script", true, "Location of the shell script to be " +
|
||||||
|
"executed. Can only specify either --shell_command or --shell_script");
|
||||||
opts.addOption("shell_args", true, "Command line args for the shell script." +
|
opts.addOption("shell_args", true, "Command line args for the shell script." +
|
||||||
"Multiple args can be separated by empty space.");
|
"Multiple args can be separated by empty space.");
|
||||||
opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
|
opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
|
||||||
@ -308,12 +315,15 @@ public boolean init(String[] args) throws ParseException {
|
|||||||
|
|
||||||
appMasterJar = cliParser.getOptionValue("jar");
|
appMasterJar = cliParser.getOptionValue("jar");
|
||||||
|
|
||||||
if (!cliParser.hasOption("shell_command")) {
|
if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
|
||||||
throw new IllegalArgumentException("No shell command specified to be executed by application master");
|
throw new IllegalArgumentException(
|
||||||
}
|
"No shell command or shell script specified to be executed by application master");
|
||||||
shellCommand = cliParser.getOptionValue("shell_command");
|
} else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
|
||||||
|
throw new IllegalArgumentException("Can not specify shell_command option " +
|
||||||
if (cliParser.hasOption("shell_script")) {
|
"and shell_script option at the same time");
|
||||||
|
} else if (cliParser.hasOption("shell_command")) {
|
||||||
|
shellCommand = cliParser.getOptionValue("shell_command");
|
||||||
|
} else {
|
||||||
shellScriptPath = cliParser.getOptionValue("shell_script");
|
shellScriptPath = cliParser.getOptionValue("shell_script");
|
||||||
}
|
}
|
||||||
if (cliParser.hasOption("shell_args")) {
|
if (cliParser.hasOption("shell_args")) {
|
||||||
@ -466,8 +476,11 @@ public boolean run() throws IOException, YarnException {
|
|||||||
long hdfsShellScriptTimestamp = 0;
|
long hdfsShellScriptTimestamp = 0;
|
||||||
if (!shellScriptPath.isEmpty()) {
|
if (!shellScriptPath.isEmpty()) {
|
||||||
Path shellSrc = new Path(shellScriptPath);
|
Path shellSrc = new Path(shellScriptPath);
|
||||||
String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
|
String shellPathSuffix =
|
||||||
Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
|
appName + "/" + appId.getId() + "/"
|
||||||
|
+ (Shell.WINDOWS ? windowBatPath : linuxShellPath);
|
||||||
|
Path shellDst =
|
||||||
|
new Path(fs.getHomeDirectory(), shellPathSuffix);
|
||||||
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
|
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
|
||||||
hdfsShellScriptLocation = shellDst.toUri().toString();
|
hdfsShellScriptLocation = shellDst.toUri().toString();
|
||||||
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
|
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
|
||||||
|
@ -303,6 +303,54 @@ public void testDSShellWithMultipleArgs() throws Exception {
|
|||||||
verifyContainerLog(4, expectedContent, false, "");
|
verifyContainerLog(4, expectedContent, false, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDSShellWithShellScript() throws Exception {
|
||||||
|
final File basedir =
|
||||||
|
new File("target", TestDistributedShell.class.getName());
|
||||||
|
final File tmpDir = new File(basedir, "tmpDir");
|
||||||
|
tmpDir.mkdirs();
|
||||||
|
final File customShellScript = new File(tmpDir, "custom_script.sh");
|
||||||
|
if (customShellScript.exists()) {
|
||||||
|
customShellScript.delete();
|
||||||
|
}
|
||||||
|
if (!customShellScript.createNewFile()) {
|
||||||
|
Assert.fail("Can not create custom shell script file.");
|
||||||
|
}
|
||||||
|
PrintWriter fileWriter = new PrintWriter(customShellScript);
|
||||||
|
// set the output to DEBUG level
|
||||||
|
fileWriter.write("echo testDSShellWithShellScript");
|
||||||
|
fileWriter.close();
|
||||||
|
System.out.println(customShellScript.getAbsolutePath());
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"1",
|
||||||
|
"--shell_script",
|
||||||
|
customShellScript.getAbsolutePath(),
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--master_vcores",
|
||||||
|
"2",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--container_vcores",
|
||||||
|
"1"
|
||||||
|
};
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client");
|
||||||
|
final Client client =
|
||||||
|
new Client(new Configuration(yarnCluster.getConfig()));
|
||||||
|
boolean initSuccess = client.init(args);
|
||||||
|
Assert.assertTrue(initSuccess);
|
||||||
|
LOG.info("Running DS Client");
|
||||||
|
boolean result = client.run();
|
||||||
|
LOG.info("Client run completed. Result=" + result);
|
||||||
|
List<String> expectedContent = new ArrayList<String>();
|
||||||
|
expectedContent.add("testDSShellWithShellScript");
|
||||||
|
verifyContainerLog(1, expectedContent, false, "");
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
public void testDSShellWithInvalidArgs() throws Exception {
|
public void testDSShellWithInvalidArgs() throws Exception {
|
||||||
Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
||||||
@ -399,6 +447,58 @@ public void testDSShellWithInvalidArgs() throws Exception {
|
|||||||
Assert.assertTrue("The throw exception is not expected",
|
Assert.assertTrue("The throw exception is not expected",
|
||||||
e.getMessage().contains("Invalid virtual cores specified"));
|
e.getMessage().contains("Invalid virtual cores specified"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client with --shell_command and --shell_script");
|
||||||
|
try {
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"2",
|
||||||
|
"--shell_command",
|
||||||
|
Shell.WINDOWS ? "dir" : "ls",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--master_vcores",
|
||||||
|
"2",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--container_vcores",
|
||||||
|
"1",
|
||||||
|
"--shell_script",
|
||||||
|
"test.sh"
|
||||||
|
};
|
||||||
|
client.init(args);
|
||||||
|
Assert.fail("Exception is expected");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
Assert.assertTrue("The throw exception is not expected",
|
||||||
|
e.getMessage().contains("Can not specify shell_command option " +
|
||||||
|
"and shell_script option at the same time"));
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client without --shell_command and --shell_script");
|
||||||
|
try {
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"2",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--master_vcores",
|
||||||
|
"2",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--container_vcores",
|
||||||
|
"1"
|
||||||
|
};
|
||||||
|
client.init(args);
|
||||||
|
Assert.fail("Exception is expected");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
Assert.assertTrue("The throw exception is not expected",
|
||||||
|
e.getMessage().contains("No shell command or shell script specified " +
|
||||||
|
"to be executed by application master"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static void waitForNMToRegister(NodeManager nm)
|
protected static void waitForNMToRegister(NodeManager nm)
|
||||||
@ -490,10 +590,10 @@ private int verifyContainerLog(int containerNum,
|
|||||||
for (File output : containerFiles[i].listFiles()) {
|
for (File output : containerFiles[i].listFiles()) {
|
||||||
if (output.getName().trim().contains("stdout")) {
|
if (output.getName().trim().contains("stdout")) {
|
||||||
BufferedReader br = null;
|
BufferedReader br = null;
|
||||||
|
List<String> stdOutContent = new ArrayList<String>();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
String sCurrentLine;
|
String sCurrentLine;
|
||||||
|
|
||||||
br = new BufferedReader(new FileReader(output));
|
br = new BufferedReader(new FileReader(output));
|
||||||
int numOfline = 0;
|
int numOfline = 0;
|
||||||
while ((sCurrentLine = br.readLine()) != null) {
|
while ((sCurrentLine = br.readLine()) != null) {
|
||||||
@ -502,12 +602,25 @@ private int verifyContainerLog(int containerNum,
|
|||||||
numOfWords++;
|
numOfWords++;
|
||||||
}
|
}
|
||||||
} else if (output.getName().trim().equals("stdout")){
|
} else if (output.getName().trim().equals("stdout")){
|
||||||
Assert.assertEquals("The current is" + sCurrentLine,
|
if (! Shell.WINDOWS) {
|
||||||
expectedContent.get(numOfline), sCurrentLine.trim());
|
Assert.assertEquals("The current is" + sCurrentLine,
|
||||||
numOfline++;
|
expectedContent.get(numOfline), sCurrentLine.trim());
|
||||||
|
numOfline++;
|
||||||
|
} else {
|
||||||
|
stdOutContent.add(sCurrentLine.trim());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/* By executing bat script using cmd /c,
|
||||||
|
* it will output all contents from bat script first
|
||||||
|
* It is hard for us to do check line by line
|
||||||
|
* Simply check whether output from bat file contains
|
||||||
|
* all the expected messages
|
||||||
|
*/
|
||||||
|
if (Shell.WINDOWS && !count
|
||||||
|
&& output.getName().trim().equals("stdout")) {
|
||||||
|
Assert.assertTrue(stdOutContent.containsAll(expectedContent));
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
} finally {
|
} finally {
|
||||||
@ -523,6 +636,5 @@ private int verifyContainerLog(int containerNum,
|
|||||||
}
|
}
|
||||||
return numOfWords;
|
return numOfWords;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user