YARN-1303. Fixed DistributedShell to not fail with multiple commands separated by a semi-colon as shell-command. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1544023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-21 03:47:02 +00:00
parent 5f458ef23f
commit ad558cf2b3
4 changed files with 130 additions and 3 deletions

View File

@ -114,6 +114,9 @@ Release 2.3.0 - UNRELEASED
YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit
Daga via Sandy Ryza)
YARN-1303. Fixed DistributedShell to not fail with multiple commands separated
by a semi-colon as shell-command. (Xuan Gong via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -19,8 +19,9 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
@ -376,7 +377,16 @@ public class ApplicationMaster {
throw new IllegalArgumentException(
"No shell command specified to be executed by application master");
}
shellCommand = cliParser.getOptionValue("shell_command");
String shellCommandPath = cliParser.getOptionValue("shell_command");
FileInputStream fs = null;
DataInputStream ds = null;
try {
ds = new DataInputStream(new FileInputStream(shellCommandPath));
shellCommand = ds.readUTF();
} finally {
org.apache.commons.io.IOUtils.closeQuietly(ds);
org.apache.commons.io.IOUtils.closeQuietly(fs);
}
if (cliParser.hasOption("shell_args")) {
shellArgs = cliParser.getOptionValue("shell_args");

View File

@ -32,14 +32,17 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@ -131,6 +134,7 @@ public class Client {
// Shell command to be executed
private String shellCommand = "";
private final String shellCommandPath = "shellCommands";
// Location of shell script
private String shellScriptPath = "";
// Args to be passed to the shell command
@ -483,6 +487,29 @@ public class Client {
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}
if (!shellCommand.isEmpty()) {
String shellCommandSuffix =
appName + "/" + appId.getId() + "/" + shellCommandPath;
Path shellCommandDst =
new Path(fs.getHomeDirectory(), shellCommandSuffix);
FSDataOutputStream ostream = null;
try {
ostream = FileSystem
.create(fs, shellCommandDst, new FsPermission((short) 0710));
ostream.writeUTF(shellCommand);
} finally {
IOUtils.closeQuietly(ostream);
}
FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
LocalResource scRsrc = Records.newRecord(LocalResource.class);
scRsrc.setType(LocalResourceType.FILE);
scRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst
.toUri()));
scRsrc.setTimestamp(scFileStatus.getModificationTime());
scRsrc.setSize(scFileStatus.getLen());
localResources.put("shellCommands", scRsrc);
}
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
@ -541,8 +568,9 @@ public class Client {
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (!shellCommand.isEmpty()) {
vargs.add("--shell_command " + shellCommand + "");
vargs.add("--shell_command " + shellCommandPath + "");
}
if (!shellArgs.isEmpty()) {
vargs.add("--shell_args " + shellArgs + "");

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -170,6 +173,38 @@ public class TestDistributedShell {
}
@Test(timeout=90000)
public void testDSShellWithCommands() throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--shell_command",
"echo HADOOP YARN MAPREDUCE|wc -w",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1"
};
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
List<String> expectedContent = new ArrayList<String>();
expectedContent.add("3");
verifyContainerLog(2, expectedContent);
}
@Test(timeout=90000)
public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
@ -332,5 +367,56 @@ public class TestDistributedShell {
LOG.info("Running DS Client");
Assert.assertTrue(client.run());
}
private void
verifyContainerLog(int containerNum, List<String> expectedContent) {
File logFolder =
new File(yarnCluster.getNodeManager(0).getConfig()
.get(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS));
File[] listOfFiles = logFolder.listFiles();
int currentContainerLogFileIndex = -1;
for (int i = listOfFiles.length - 1; i >= 0; i--) {
if (listOfFiles[i].listFiles().length == containerNum + 1) {
currentContainerLogFileIndex = i;
break;
}
}
Assert.assertTrue(currentContainerLogFileIndex != -1);
File[] containerFiles =
listOfFiles[currentContainerLogFileIndex].listFiles();
for (int i = 0; i < containerFiles.length; i++) {
for (File output : containerFiles[i].listFiles()) {
if (output.getName().trim().equalsIgnoreCase("stdout")) {
BufferedReader br = null;
try {
String sCurrentLine;
br = new BufferedReader(new FileReader(output));
int numOfline = 0;
while ((sCurrentLine = br.readLine()) != null) {
Assert.assertEquals("The current is" + sCurrentLine,
expectedContent.get(numOfline), sCurrentLine.trim());
numOfline++;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (br != null)
br.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
}
}
}