diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c633e901b2c..945e30e2d31 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -65,6 +65,9 @@ Release 2.1.2 - UNRELEASED YARN-1214. Register ClientToken MasterKey in SecretManager after it is saved (Jian He via bikas) + YARN-49. Improve distributed shell application to work on a secure cluster. + (Vinod Kumar Vavilapalli via hitesh) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 8914646cbe4..740a720ad00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -26,6 +26,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Vector; @@ -43,10 +44,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -147,7 +153,7 @@ public class ApplicationMaster { // Handle to communicate with the Resource Manager @SuppressWarnings("rawtypes") - private AMRMClientAsync resourceManager; + private AMRMClientAsync amRMClient; // Handle to communicate with the Node Manager private NMClientAsync nmClientAsync; @@ -206,7 +212,9 @@ public class ApplicationMaster { private volatile boolean done; private volatile boolean success; - + + private ByteBuffer allTokens; + // Launch threads private List launchThreads = new ArrayList(); @@ -441,11 +449,24 @@ public class ApplicationMaster { public boolean run() throws YarnException, IOException { LOG.info("Starting ApplicationMaster"); + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + // Now remove the AM->RM token so that containers cannot access it. + Iterator> iter = credentials.getAllTokens().iterator(); + while (iter.hasNext()) { + Token token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + iter.remove(); + } + } + allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); - resourceManager = - AMRMClientAsync.createAMRMClientAsync(1000, allocListener); - resourceManager.init(conf); - resourceManager.start(); + amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); + amRMClient.init(conf); + amRMClient.start(); containerListener = new NMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); @@ -460,7 +481,7 @@ public class ApplicationMaster { // Register self with ResourceManager // This will start heartbeating to the RM - RegisterApplicationMasterResponse response = resourceManager + RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); // Dump out information about cluster capability as seen by the @@ -485,7 +506,7 @@ public class ApplicationMaster { // executed on them ( regardless of success/failure). for (int i = 0; i < numTotalContainers; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); - resourceManager.addContainerRequest(containerAsk); + amRMClient.addContainerRequest(containerAsk); } numRequestedContainers.set(numTotalContainers); @@ -535,7 +556,7 @@ public class ApplicationMaster { success = false; } try { - resourceManager.unregisterApplicationMaster(appStatus, appMessage, null); + amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); } catch (YarnException ex) { LOG.error("Failed to unregister application", ex); } catch (IOException e) { @@ -543,7 +564,7 @@ public class ApplicationMaster { } done = true; - resourceManager.stop(); + amRMClient.stop(); } private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { @@ -595,7 +616,7 @@ public class ApplicationMaster { if (askCount > 0) { for (int i = 0; i < askCount; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); - resourceManager.addContainerRequest(containerAsk); + amRMClient.addContainerRequest(containerAsk); } } @@ -651,7 +672,7 @@ public class ApplicationMaster { @Override public void onError(Throwable e) { done = true; - resourceManager.stop(); + amRMClient.stop(); } } @@ -807,6 +828,14 @@ public class ApplicationMaster { commands.add(command.toString()); ctx.setCommands(commands); + // Set up tokens for the container too. Today, for normal shell commands, + // the container in distribute-shell doesn't need any tokens. We are + // populating them mainly for NodeManagers to be able to download any + // files in the distributed file-system. The tokens are otherwise also + // useful in cases, for e.g., when one is running a "hadoop dfs" command + // inside the distributed shell. + ctx.setTokens(allTokens); + containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index e49c09aa365..7d51a6783f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,9 +40,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -543,8 +547,28 @@ public class Client { // Not needed in this scenario // amContainer.setServiceData(serviceData); - // The following are not required for launching an application master - // amContainer.setContainerId(containerId); + // Setup security tokens + if (UserGroupInformation.isSecurityEnabled()) { + Credentials credentials = new Credentials(); + String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException( + "Can't get Master Kerberos principal for the RM to use as renewer"); + } + + // For now, only getting tokens for the default file-system. + final Token tokens[] = + fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(fsTokens); + } appContext.setAMContainerSpec(amContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index beee423a47c..99ad5a90b49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -166,7 +166,8 @@ public class AMRMClientImpl extends AMRMClient { protected void serviceStart() throws Exception { final YarnConfiguration conf = new YarnConfiguration(getConfig()); try { - rmClient = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); + rmClient = + ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); } catch (IOException e) { throw new YarnRuntimeException(e); }