YARN-1908. Fixed DistributedShell to not fail in secure clusters. Contributed by Vinod Kumar Vavilapalli and Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1585849 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-04-08 22:11:52 +00:00
parent 7915b36225
commit 6c2a0ce30b
2 changed files with 53 additions and 18 deletions

View File

@ -85,6 +85,9 @@ Release 2.4.1 - UNRELEASED
YARN-1905. TestProcfsBasedProcessTree must only run on Linux. (cnauroth) YARN-1905. TestProcfsBasedProcessTree must only run on Linux. (cnauroth)
YARN-1908. Fixed DistributedShell to not fail in secure clusters. (Vinod
Kumar Vavilapalli and Jian He via vinodkv)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -91,7 +92,6 @@
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -185,6 +185,9 @@ public static enum DSEntity {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private AMRMClientAsync amRMClient; private AMRMClientAsync amRMClient;
// In both secure and non-secure modes, this points to the job-submitter.
private UserGroupInformation appSubmitterUgi;
// Handle to communicate with the Node Manager // Handle to communicate with the Node Manager
private NMClientAsync nmClientAsync; private NMClientAsync nmClientAsync;
// Listen to process the response from the Node Manager // Listen to process the response from the Node Manager
@ -236,7 +239,7 @@ public static enum DSEntity {
// Location of shell script ( obtained from info set in env ) // Location of shell script ( obtained from info set in env )
// Shell script path in fs // Shell script path in fs
private String shellScriptPath = ""; private String scriptPath = "";
// Timestamp needed for creating a local resource // Timestamp needed for creating a local resource
private long shellScriptPathTimestamp = 0; private long shellScriptPathTimestamp = 0;
// File length needed for local resource // File length needed for local resource
@ -451,7 +454,7 @@ public boolean init(String[] args) throws ParseException, IOException {
} }
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
shellScriptPathTimestamp = Long.valueOf(envs shellScriptPathTimestamp = Long.valueOf(envs
@ -462,10 +465,10 @@ public boolean init(String[] args) throws ParseException, IOException {
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
} }
if (!shellScriptPath.isEmpty() if (!scriptPath.isEmpty()
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
LOG.error("Illegal values in env for shell script path" + ", path=" LOG.error("Illegal values in env for shell script path" + ", path="
+ shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp=" + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
+ shellScriptPathTimestamp); + shellScriptPathTimestamp);
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Illegal values in env for shell script path"); "Illegal values in env for shell script path");
@ -525,14 +528,23 @@ public void run() throws YarnException, IOException {
credentials.writeTokenStorageToStream(dob); credentials.writeTokenStorageToStream(dob);
// Now remove the AM->RM token so that containers cannot access it. // Now remove the AM->RM token so that containers cannot access it.
Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
LOG.info("Executing with tokens:");
while (iter.hasNext()) { while (iter.hasNext()) {
Token<?> token = iter.next(); Token<?> token = iter.next();
LOG.info(token);
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove(); iter.remove();
} }
} }
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Create appSubmitterUgi and add original tokens to it
String appSubmitterUserName =
System.getenv(ApplicationConstants.Environment.USER.name());
appSubmitterUgi =
UserGroupInformation.createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf); amRMClient.init(conf);
@ -901,19 +913,26 @@ public void run() {
// resources too. // resources too.
// In this scenario, if a shell script is specified, we need to have it // In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container. // copied and made available to the container.
if (!shellScriptPath.isEmpty()) { if (!scriptPath.isEmpty()) {
Path renamedSchellScriptPath = null; Path renamedScriptPath = null;
if (Shell.WINDOWS) { if (Shell.WINDOWS) {
renamedSchellScriptPath = new Path(shellScriptPath + ".bat"); renamedScriptPath = new Path(scriptPath + ".bat");
} else { } else {
renamedSchellScriptPath = new Path(shellScriptPath + ".sh"); renamedScriptPath = new Path(scriptPath + ".sh");
} }
try { try {
FileSystem fs = renamedSchellScriptPath.getFileSystem(conf); // rename the script file based on the underlying OS syntax.
fs.rename(new Path(shellScriptPath), renamedSchellScriptPath); renameScriptFile(renamedScriptPath);
} catch (IOException e) { } catch (Exception e) {
LOG.warn("Not able to add suffix (.bat/.sh) to the shell script filename"); LOG.error(
throw new YarnRuntimeException(e); "Not able to add suffix (.bat/.sh) to the shell script filename",
e);
// We know we cannot continue launching the container
// so we should release it.
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
} }
LocalResource shellRsrc = Records.newRecord(LocalResource.class); LocalResource shellRsrc = Records.newRecord(LocalResource.class);
@ -921,11 +940,10 @@ public void run() {
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try { try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI( shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
renamedSchellScriptPath.toString()))); renamedScriptPath.toString())));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified" LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + renamedSchellScriptPath); + " in env, path=" + renamedScriptPath, e);
e.printStackTrace();
// A failure scenario on bad input such as invalid shell script path // A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container // We know we cannot continue launching the container
@ -949,7 +967,7 @@ public void run() {
// Set executable command // Set executable command
vargs.add(shellCommand); vargs.add(shellCommand);
// Set shell script path // Set shell script path
if (!shellScriptPath.isEmpty()) { if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
: ExecShellStringPath); : ExecShellStringPath);
} }
@ -983,6 +1001,20 @@ public void run() {
} }
} }
private void renameScriptFile(final Path renamedScriptPath)
throws IOException, InterruptedException {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
FileSystem fs = renamedScriptPath.getFileSystem(conf);
fs.rename(new Path(scriptPath), renamedScriptPath);
return null;
}
});
LOG.info("User " + appSubmitterUgi.getUserName()
+ " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
}
/** /**
* Setup the request that will be sent to the RM for the container ask. * Setup the request that will be sent to the RM for the container ask.
* *