YARN-1303. Fixed DistributedShell to not fail with multiple commands separated by a semi-colon as shell-command. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1544023 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1544024 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eb97badbcd
commit
8f5ec0f5f8
@ -96,6 +96,9 @@ Release 2.3.0 - UNRELEASED
|
|||||||
YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit
|
YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit
|
||||||
Daga via Sandy Ryza)
|
Daga via Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1303. Fixed DistributedShell to not fail with multiple commands separated
|
||||||
|
by a semi-colon as shell-command. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -19,8 +19,9 @@
|
|||||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
@ -376,7 +377,16 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"No shell command specified to be executed by application master");
|
"No shell command specified to be executed by application master");
|
||||||
}
|
}
|
||||||
shellCommand = cliParser.getOptionValue("shell_command");
|
String shellCommandPath = cliParser.getOptionValue("shell_command");
|
||||||
|
FileInputStream fs = null;
|
||||||
|
DataInputStream ds = null;
|
||||||
|
try {
|
||||||
|
ds = new DataInputStream(new FileInputStream(shellCommandPath));
|
||||||
|
shellCommand = ds.readUTF();
|
||||||
|
} finally {
|
||||||
|
org.apache.commons.io.IOUtils.closeQuietly(ds);
|
||||||
|
org.apache.commons.io.IOUtils.closeQuietly(fs);
|
||||||
|
}
|
||||||
|
|
||||||
if (cliParser.hasOption("shell_args")) {
|
if (cliParser.hasOption("shell_args")) {
|
||||||
shellArgs = cliParser.getOptionValue("shell_args");
|
shellArgs = cliParser.getOptionValue("shell_args");
|
||||||
|
@ -32,14 +32,17 @@
|
|||||||
import org.apache.commons.cli.HelpFormatter;
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.cli.ParseException;
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -131,6 +134,7 @@ public class Client {
|
|||||||
|
|
||||||
// Shell command to be executed
|
// Shell command to be executed
|
||||||
private String shellCommand = "";
|
private String shellCommand = "";
|
||||||
|
private final String shellCommandPath = "shellCommands";
|
||||||
// Location of shell script
|
// Location of shell script
|
||||||
private String shellScriptPath = "";
|
private String shellScriptPath = "";
|
||||||
// Args to be passed to the shell command
|
// Args to be passed to the shell command
|
||||||
@ -483,6 +487,29 @@ public boolean run() throws IOException, YarnException {
|
|||||||
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
|
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!shellCommand.isEmpty()) {
|
||||||
|
String shellCommandSuffix =
|
||||||
|
appName + "/" + appId.getId() + "/" + shellCommandPath;
|
||||||
|
Path shellCommandDst =
|
||||||
|
new Path(fs.getHomeDirectory(), shellCommandSuffix);
|
||||||
|
FSDataOutputStream ostream = null;
|
||||||
|
try {
|
||||||
|
ostream = FileSystem
|
||||||
|
.create(fs, shellCommandDst, new FsPermission((short) 0710));
|
||||||
|
ostream.writeUTF(shellCommand);
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeQuietly(ostream);
|
||||||
|
}
|
||||||
|
FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
|
||||||
|
LocalResource scRsrc = Records.newRecord(LocalResource.class);
|
||||||
|
scRsrc.setType(LocalResourceType.FILE);
|
||||||
|
scRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||||
|
scRsrc.setResource(ConverterUtils.getYarnUrlFromURI(shellCommandDst
|
||||||
|
.toUri()));
|
||||||
|
scRsrc.setTimestamp(scFileStatus.getModificationTime());
|
||||||
|
scRsrc.setSize(scFileStatus.getLen());
|
||||||
|
localResources.put("shellCommands", scRsrc);
|
||||||
|
}
|
||||||
// Set local resource info into app master container launch context
|
// Set local resource info into app master container launch context
|
||||||
amContainer.setLocalResources(localResources);
|
amContainer.setLocalResources(localResources);
|
||||||
|
|
||||||
@ -541,8 +568,9 @@ public boolean run() throws IOException, YarnException {
|
|||||||
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
|
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
|
||||||
vargs.add("--num_containers " + String.valueOf(numContainers));
|
vargs.add("--num_containers " + String.valueOf(numContainers));
|
||||||
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
||||||
|
|
||||||
if (!shellCommand.isEmpty()) {
|
if (!shellCommand.isEmpty()) {
|
||||||
vargs.add("--shell_command " + shellCommand + "");
|
vargs.add("--shell_command " + shellCommandPath + "");
|
||||||
}
|
}
|
||||||
if (!shellArgs.isEmpty()) {
|
if (!shellArgs.isEmpty()) {
|
||||||
vargs.add("--shell_args " + shellArgs + "");
|
vargs.add("--shell_args " + shellArgs + "");
|
||||||
|
@ -18,12 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@ -170,6 +173,38 @@ public void run() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDSShellWithCommands() throws Exception {
|
||||||
|
String[] args = {
|
||||||
|
"--jar",
|
||||||
|
APPMASTER_JAR,
|
||||||
|
"--num_containers",
|
||||||
|
"2",
|
||||||
|
"--shell_command",
|
||||||
|
"echo HADOOP YARN MAPREDUCE|wc -w",
|
||||||
|
"--master_memory",
|
||||||
|
"512",
|
||||||
|
"--master_vcores",
|
||||||
|
"2",
|
||||||
|
"--container_memory",
|
||||||
|
"128",
|
||||||
|
"--container_vcores",
|
||||||
|
"1"
|
||||||
|
};
|
||||||
|
|
||||||
|
LOG.info("Initializing DS Client");
|
||||||
|
final Client client =
|
||||||
|
new Client(new Configuration(yarnCluster.getConfig()));
|
||||||
|
boolean initSuccess = client.init(args);
|
||||||
|
Assert.assertTrue(initSuccess);
|
||||||
|
LOG.info("Running DS Client");
|
||||||
|
boolean result = client.run();
|
||||||
|
LOG.info("Client run completed. Result=" + result);
|
||||||
|
List<String> expectedContent = new ArrayList<String>();
|
||||||
|
expectedContent.add("3");
|
||||||
|
verifyContainerLog(2, expectedContent);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
public void testDSShellWithInvalidArgs() throws Exception {
|
public void testDSShellWithInvalidArgs() throws Exception {
|
||||||
Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
Client client = new Client(new Configuration(yarnCluster.getConfig()));
|
||||||
@ -332,5 +367,56 @@ public void testDebugFlag() throws Exception {
|
|||||||
LOG.info("Running DS Client");
|
LOG.info("Running DS Client");
|
||||||
Assert.assertTrue(client.run());
|
Assert.assertTrue(client.run());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void
|
||||||
|
verifyContainerLog(int containerNum, List<String> expectedContent) {
|
||||||
|
File logFolder =
|
||||||
|
new File(yarnCluster.getNodeManager(0).getConfig()
|
||||||
|
.get(YarnConfiguration.NM_LOG_DIRS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_LOG_DIRS));
|
||||||
|
|
||||||
|
File[] listOfFiles = logFolder.listFiles();
|
||||||
|
int currentContainerLogFileIndex = -1;
|
||||||
|
for (int i = listOfFiles.length - 1; i >= 0; i--) {
|
||||||
|
if (listOfFiles[i].listFiles().length == containerNum + 1) {
|
||||||
|
currentContainerLogFileIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertTrue(currentContainerLogFileIndex != -1);
|
||||||
|
File[] containerFiles =
|
||||||
|
listOfFiles[currentContainerLogFileIndex].listFiles();
|
||||||
|
|
||||||
|
for (int i = 0; i < containerFiles.length; i++) {
|
||||||
|
for (File output : containerFiles[i].listFiles()) {
|
||||||
|
if (output.getName().trim().equalsIgnoreCase("stdout")) {
|
||||||
|
BufferedReader br = null;
|
||||||
|
try {
|
||||||
|
|
||||||
|
String sCurrentLine;
|
||||||
|
|
||||||
|
br = new BufferedReader(new FileReader(output));
|
||||||
|
int numOfline = 0;
|
||||||
|
while ((sCurrentLine = br.readLine()) != null) {
|
||||||
|
Assert.assertEquals("The current is" + sCurrentLine,
|
||||||
|
expectedContent.get(numOfline), sCurrentLine.trim());
|
||||||
|
numOfline++;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (br != null)
|
||||||
|
br.close();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user