YARN-49. Improve distributed shell application to work on a secure cluster. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1526330 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3582026204
commit
42c3cd3d13
|
@ -80,6 +80,9 @@ Release 2.1.2 - UNRELEASED
|
||||||
YARN-1214. Register ClientToken MasterKey in SecretManager after it is
|
YARN-1214. Register ClientToken MasterKey in SecretManager after it is
|
||||||
saved (Jian He via bikas)
|
saved (Jian He via bikas)
|
||||||
|
|
||||||
|
YARN-49. Improve distributed shell application to work on a secure cluster.
|
||||||
|
(Vinod Kumar Vavilapalli via hitesh)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Vector;
|
import java.util.Vector;
|
||||||
|
@ -43,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
||||||
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
|
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -147,7 +153,7 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
// Handle to communicate with the Resource Manager
|
// Handle to communicate with the Resource Manager
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
private AMRMClientAsync resourceManager;
|
private AMRMClientAsync amRMClient;
|
||||||
|
|
||||||
// Handle to communicate with the Node Manager
|
// Handle to communicate with the Node Manager
|
||||||
private NMClientAsync nmClientAsync;
|
private NMClientAsync nmClientAsync;
|
||||||
|
@ -206,7 +212,9 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
private volatile boolean done;
|
private volatile boolean done;
|
||||||
private volatile boolean success;
|
private volatile boolean success;
|
||||||
|
|
||||||
|
private ByteBuffer allTokens;
|
||||||
|
|
||||||
// Launch threads
|
// Launch threads
|
||||||
private List<Thread> launchThreads = new ArrayList<Thread>();
|
private List<Thread> launchThreads = new ArrayList<Thread>();
|
||||||
|
|
||||||
|
@ -441,11 +449,24 @@ public class ApplicationMaster {
|
||||||
public boolean run() throws YarnException, IOException {
|
public boolean run() throws YarnException, IOException {
|
||||||
LOG.info("Starting ApplicationMaster");
|
LOG.info("Starting ApplicationMaster");
|
||||||
|
|
||||||
|
Credentials credentials =
|
||||||
|
UserGroupInformation.getCurrentUser().getCredentials();
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
credentials.writeTokenStorageToStream(dob);
|
||||||
|
// Now remove the AM->RM token so that containers cannot access it.
|
||||||
|
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
Token<?> token = iter.next();
|
||||||
|
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
|
||||||
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
||||||
resourceManager =
|
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
||||||
AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
amRMClient.init(conf);
|
||||||
resourceManager.init(conf);
|
amRMClient.start();
|
||||||
resourceManager.start();
|
|
||||||
|
|
||||||
containerListener = new NMCallbackHandler();
|
containerListener = new NMCallbackHandler();
|
||||||
nmClientAsync = new NMClientAsyncImpl(containerListener);
|
nmClientAsync = new NMClientAsyncImpl(containerListener);
|
||||||
|
@ -460,7 +481,7 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
// Register self with ResourceManager
|
// Register self with ResourceManager
|
||||||
// This will start heartbeating to the RM
|
// This will start heartbeating to the RM
|
||||||
RegisterApplicationMasterResponse response = resourceManager
|
RegisterApplicationMasterResponse response = amRMClient
|
||||||
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
||||||
appMasterTrackingUrl);
|
appMasterTrackingUrl);
|
||||||
// Dump out information about cluster capability as seen by the
|
// Dump out information about cluster capability as seen by the
|
||||||
|
@ -485,7 +506,7 @@ public class ApplicationMaster {
|
||||||
// executed on them ( regardless of success/failure).
|
// executed on them ( regardless of success/failure).
|
||||||
for (int i = 0; i < numTotalContainers; ++i) {
|
for (int i = 0; i < numTotalContainers; ++i) {
|
||||||
ContainerRequest containerAsk = setupContainerAskForRM();
|
ContainerRequest containerAsk = setupContainerAskForRM();
|
||||||
resourceManager.addContainerRequest(containerAsk);
|
amRMClient.addContainerRequest(containerAsk);
|
||||||
}
|
}
|
||||||
numRequestedContainers.set(numTotalContainers);
|
numRequestedContainers.set(numTotalContainers);
|
||||||
|
|
||||||
|
@ -535,7 +556,7 @@ public class ApplicationMaster {
|
||||||
success = false;
|
success = false;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
|
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
|
||||||
} catch (YarnException ex) {
|
} catch (YarnException ex) {
|
||||||
LOG.error("Failed to unregister application", ex);
|
LOG.error("Failed to unregister application", ex);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -543,7 +564,7 @@ public class ApplicationMaster {
|
||||||
}
|
}
|
||||||
|
|
||||||
done = true;
|
done = true;
|
||||||
resourceManager.stop();
|
amRMClient.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
||||||
|
@ -595,7 +616,7 @@ public class ApplicationMaster {
|
||||||
if (askCount > 0) {
|
if (askCount > 0) {
|
||||||
for (int i = 0; i < askCount; ++i) {
|
for (int i = 0; i < askCount; ++i) {
|
||||||
ContainerRequest containerAsk = setupContainerAskForRM();
|
ContainerRequest containerAsk = setupContainerAskForRM();
|
||||||
resourceManager.addContainerRequest(containerAsk);
|
amRMClient.addContainerRequest(containerAsk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -651,7 +672,7 @@ public class ApplicationMaster {
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable e) {
|
public void onError(Throwable e) {
|
||||||
done = true;
|
done = true;
|
||||||
resourceManager.stop();
|
amRMClient.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -807,6 +828,14 @@ public class ApplicationMaster {
|
||||||
commands.add(command.toString());
|
commands.add(command.toString());
|
||||||
ctx.setCommands(commands);
|
ctx.setCommands(commands);
|
||||||
|
|
||||||
|
// Set up tokens for the container too. Today, for normal shell commands,
|
||||||
|
// the container in distribute-shell doesn't need any tokens. We are
|
||||||
|
// populating them mainly for NodeManagers to be able to download any
|
||||||
|
// files in the distributed file-system. The tokens are otherwise also
|
||||||
|
// useful in cases, for e.g., when one is running a "hadoop dfs" command
|
||||||
|
// inside the distributed shell.
|
||||||
|
ctx.setTokens(allTokens);
|
||||||
|
|
||||||
containerListener.addContainer(container.getId(), container);
|
containerListener.addContainer(container.getId(), container);
|
||||||
nmClientAsync.startContainerAsync(container, ctx);
|
nmClientAsync.startContainerAsync(container, ctx);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -40,9 +40,13 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -543,8 +547,28 @@ public class Client {
|
||||||
// Not needed in this scenario
|
// Not needed in this scenario
|
||||||
// amContainer.setServiceData(serviceData);
|
// amContainer.setServiceData(serviceData);
|
||||||
|
|
||||||
// The following are not required for launching an application master
|
// Setup security tokens
|
||||||
// amContainer.setContainerId(containerId);
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
|
||||||
|
if (tokenRenewer == null || tokenRenewer.length() == 0) {
|
||||||
|
throw new IOException(
|
||||||
|
"Can't get Master Kerberos principal for the RM to use as renewer");
|
||||||
|
}
|
||||||
|
|
||||||
|
// For now, only getting tokens for the default file-system.
|
||||||
|
final Token<?> tokens[] =
|
||||||
|
fs.addDelegationTokens(tokenRenewer, credentials);
|
||||||
|
if (tokens != null) {
|
||||||
|
for (Token<?> token : tokens) {
|
||||||
|
LOG.info("Got dt for " + fs.getUri() + "; " + token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
credentials.writeTokenStorageToStream(dob);
|
||||||
|
ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
amContainer.setTokens(fsTokens);
|
||||||
|
}
|
||||||
|
|
||||||
appContext.setAMContainerSpec(amContainer);
|
appContext.setAMContainerSpec(amContainer);
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
final YarnConfiguration conf = new YarnConfiguration(getConfig());
|
final YarnConfiguration conf = new YarnConfiguration(getConfig());
|
||||||
try {
|
try {
|
||||||
rmClient = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
|
rmClient =
|
||||||
|
ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnRuntimeException(e);
|
throw new YarnRuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue