YARN-579. Stop setting the Application Token in the AppMaster env, in favour of the copy present in the container token field. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-04-25 02:46:47 +00:00
parent daaa73b657
commit be256a1819
6 changed files with 99 additions and 89 deletions

View File

@ -98,6 +98,10 @@ Release 2.0.5-beta - UNRELEASED
YARN-561. Modified NodeManager to set key information into the environment YARN-561. Modified NodeManager to set key information into the environment
of every container that it launches. (Xuan Gong via vinodkv) of every container that it launches. (Xuan Gong via vinodkv)
YARN-579. Stop setting the Application Token in the AppMaster env, in
favour of the copy present in the container token field.
(Vinod Kumar Vavilapalli via sseth)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -29,10 +29,6 @@ import org.apache.hadoop.util.Shell;
*/ */
public interface ApplicationConstants { public interface ApplicationConstants {
// TODO: They say tokens via env isn't good.
public static final String APPLICATION_MASTER_TOKEN_ENV_NAME =
"AppMasterTokenEnv";
// TODO: They say tokens via env isn't good. // TODO: They say tokens via env isn't good.
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME = public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
"AppClientSecretEnv"; "AppClientSecretEnv";

View File

@ -33,13 +33,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -115,24 +111,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
throw new YarnException(e); throw new YarnException(e);
} }
if (UserGroupInformation.isSecurityEnabled()) { // CurrentUser should already have AMToken loaded.
String tokenURLEncodedStr = System.getenv().get(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
try {
token.decodeFromUrlString(tokenURLEncodedStr);
} catch (IOException e) {
throw new YarnException(e);
}
SecurityUtil.setTokenService(token, rmAddress);
if (LOG.isDebugEnabled()) {
LOG.debug("AppMasterToken is " + token);
}
currentUser.addToken(token);
}
rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() { rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override @Override
public AMRMProtocol run() { public AMRMProtocol run() {

View File

@ -204,7 +204,7 @@ public class AMLauncher implements Runnable {
ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( ApplicationTokenIdentifier id = new ApplicationTokenIdentifier(
application.getAppAttemptId()); application.getAppAttemptId());
Token<ApplicationTokenIdentifier> token = Token<ApplicationTokenIdentifier> appMasterToken =
new Token<ApplicationTokenIdentifier>(id, new Token<ApplicationTokenIdentifier>(id,
this.rmContext.getApplicationTokenSecretManager()); this.rmContext.getApplicationTokenSecretManager());
InetSocketAddress serviceAddr = conf.getSocketAddr( InetSocketAddress serviceAddr = conf.getSocketAddr(
@ -212,16 +212,11 @@ public class AMLauncher implements Runnable {
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the token, // normally the client should set the service after acquiring the token,
// but this token is directly provided to the tasks // but this token is directly provided to the AMs
SecurityUtil.setTokenService(token, serviceAddr); SecurityUtil.setTokenService(appMasterToken, serviceAddr);
String appMasterTokenEncoded = token.encodeToUrlString();
LOG.debug("Putting appMaster token in env : " + token);
environment.put(
ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
appMasterTokenEncoded);
// Add the RM token // Add the ApplicationMaster token
credentials.addToken(token.getService(), token); credentials.addToken(appMasterToken.getService(), appMasterToken);
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob); credentials.writeTokenStorageToStream(dob);
container.setContainerTokens( container.setContainerTokens(

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -28,11 +29,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
@ -59,9 +59,17 @@ public class TestAMAuthorization {
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class); private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
private static final Configuration confWithSecurityEnabled =
new Configuration();
static {
confWithSecurityEnabled.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(confWithSecurityEnabled);
}
public static final class MyContainerManager implements ContainerManager { public static final class MyContainerManager implements ContainerManager {
public Map<String, String> amContainerEnv; public ByteBuffer amTokens;
public MyContainerManager() { public MyContainerManager() {
} }
@ -70,7 +78,7 @@ public class TestAMAuthorization {
public StartContainerResponse public StartContainerResponse
startContainer(StartContainerRequest request) startContainer(StartContainerRequest request)
throws YarnRemoteException { throws YarnRemoteException {
amContainerEnv = request.getContainerLaunchContext().getEnvironment(); amTokens = request.getContainerLaunchContext().getContainerTokens();
return null; return null;
} }
@ -93,9 +101,6 @@ public class TestAMAuthorization {
public MockRMWithAMS(Configuration conf, ContainerManager containerManager) { public MockRMWithAMS(Configuration conf, ContainerManager containerManager) {
super(conf, containerManager); super(conf, containerManager);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
} }
@Override @Override
@ -105,7 +110,6 @@ public class TestAMAuthorization {
@Override @Override
protected ApplicationMasterService createApplicationMasterService() { protected ApplicationMasterService createApplicationMasterService() {
return new ApplicationMasterService(getRMContext(), this.scheduler); return new ApplicationMasterService(getRMContext(), this.scheduler);
} }
} }
@ -113,7 +117,8 @@ public class TestAMAuthorization {
@Test @Test
public void testAuthorizedAccess() throws Exception { public void testAuthorizedAccess() throws Exception {
MyContainerManager containerManager = new MyContainerManager(); MyContainerManager containerManager = new MyContainerManager();
final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager); final MockRM rm =
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
rm.start(); rm.start();
MockNM nm1 = rm.registerNode("localhost:1234", 5120); MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@ -126,11 +131,11 @@ public class TestAMAuthorization {
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
int waitCount = 0; int waitCount = 0;
while (containerManager.amContainerEnv == null && waitCount++ < 20) { while (containerManager.amTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen.."); LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000); Thread.sleep(1000);
} }
Assert.assertNotNull(containerManager.amContainerEnv); Assert.assertNotNull(containerManager.amTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -142,12 +147,12 @@ public class TestAMAuthorization {
UserGroupInformation currentUser = UserGroupInformation UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(applicationAttemptId.toString()); .createRemoteUser(applicationAttemptId.toString());
String tokenURLEncodedStr = containerManager.amContainerEnv Credentials credentials = new Credentials();
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); DataInputByteBuffer buf = new DataInputByteBuffer();
LOG.info("AppMasterToken is " + tokenURLEncodedStr); containerManager.amTokens.rewind();
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>(); buf.reset(containerManager.amTokens);
token.decodeFromUrlString(tokenURLEncodedStr); credentials.readTokenStorageStream(buf);
currentUser.addToken(token); currentUser.addCredentials(credentials);
AMRMProtocol client = currentUser AMRMProtocol client = currentUser
.doAs(new PrivilegedAction<AMRMProtocol>() { .doAs(new PrivilegedAction<AMRMProtocol>() {
@ -172,7 +177,7 @@ public class TestAMAuthorization {
@Test @Test
public void testUnauthorizedAccess() throws Exception { public void testUnauthorizedAccess() throws Exception {
MyContainerManager containerManager = new MyContainerManager(); MyContainerManager containerManager = new MyContainerManager();
MockRM rm = new MockRMWithAMS(new Configuration(), containerManager); MockRM rm = new MockRMWithAMS(confWithSecurityEnabled, containerManager);
rm.start(); rm.start();
MockNM nm1 = rm.registerNode("localhost:1234", 5120); MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@ -182,17 +187,16 @@ public class TestAMAuthorization {
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
int waitCount = 0; int waitCount = 0;
while (containerManager.amContainerEnv == null && waitCount++ < 20) { while (containerManager.amTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen.."); LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000); Thread.sleep(1000);
} }
Assert.assertNotNull(containerManager.amContainerEnv); Assert.assertNotNull(containerManager.amTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
waitForLaunchedState(attempt); waitForLaunchedState(attempt);
// Create a client to the RM.
final Configuration conf = rm.getConfig(); final Configuration conf = rm.getConfig();
final YarnRPC rpc = YarnRPC.create(conf); final YarnRPC rpc = YarnRPC.create(conf);
final InetSocketAddress serviceAddr = conf.getSocketAddr( final InetSocketAddress serviceAddr = conf.getSocketAddr(
@ -202,13 +206,8 @@ public class TestAMAuthorization {
UserGroupInformation currentUser = UserGroupInformation UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(applicationAttemptId.toString()); .createRemoteUser(applicationAttemptId.toString());
String tokenURLEncodedStr = containerManager.amContainerEnv
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
LOG.info("AppMasterToken is " + tokenURLEncodedStr);
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
token.decodeFromUrlString(tokenURLEncodedStr);
currentUser.addToken(token);
// First try contacting NM without tokens
AMRMProtocol client = currentUser AMRMProtocol client = currentUser
.doAs(new PrivilegedAction<AMRMProtocol>() { .doAs(new PrivilegedAction<AMRMProtocol>() {
@Override @Override
@ -217,9 +216,39 @@ public class TestAMAuthorization {
serviceAddr, conf); serviceAddr, conf);
} }
}); });
RegisterApplicationMasterRequest request = Records RegisterApplicationMasterRequest request = Records
.newRecord(RegisterApplicationMasterRequest.class); .newRecord(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId);
try {
client.registerApplicationMaster(request);
Assert.fail("Should fail with authorization error");
} catch (Exception e) {
// Because there are no tokens, the request should be rejected as the
// server side will assume we are trying simple auth.
Assert.assertTrue(e.getCause().getMessage().contains(
"SIMPLE authentication is not enabled. "
+ "Available:[KERBEROS, DIGEST]"));
}
// Now try to validate invalid authorization.
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
containerManager.amTokens.rewind();
buf.reset(containerManager.amTokens);
credentials.readTokenStorageStream(buf);
currentUser.addCredentials(credentials);
// Create a client to the RM.
client = currentUser
.doAs(new PrivilegedAction<AMRMProtocol>() {
@Override
public AMRMProtocol run() {
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
serviceAddr, conf);
}
});
request = Records.newRecord(RegisterApplicationMasterRequest.class);
ApplicationAttemptId otherAppAttemptId = BuilderUtils ApplicationAttemptId otherAppAttemptId = BuilderUtils
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42); .newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
request.setApplicationAttemptId(otherAppAttemptId); request.setApplicationAttemptId(otherAppAttemptId);

View File

@ -25,11 +25,11 @@ import javax.crypto.SecretKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@ -51,6 +51,14 @@ public class TestApplicationTokens {
private static final Log LOG = LogFactory.getLog(TestApplicationTokens.class); private static final Log LOG = LogFactory.getLog(TestApplicationTokens.class);
private static final Configuration confWithSecurityEnabled =
new Configuration();
static {
confWithSecurityEnabled.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(confWithSecurityEnabled);
}
/** /**
* Validate that application tokens are unusable after the * Validate that application tokens are unusable after the
* application-finishes. * application-finishes.
@ -61,7 +69,8 @@ public class TestApplicationTokens {
public void testTokenExpiry() throws Exception { public void testTokenExpiry() throws Exception {
MyContainerManager containerManager = new MyContainerManager(); MyContainerManager containerManager = new MyContainerManager();
final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager); final MockRM rm =
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
rm.start(); rm.start();
final Configuration conf = rm.getConfig(); final Configuration conf = rm.getConfig();
@ -76,11 +85,11 @@ public class TestApplicationTokens {
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
int waitCount = 0; int waitCount = 0;
while (containerManager.amContainerEnv == null && waitCount++ < 20) { while (containerManager.amTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen.."); LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000); Thread.sleep(1000);
} }
Assert.assertNotNull(containerManager.amContainerEnv); Assert.assertNotNull(containerManager.amTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -89,13 +98,12 @@ public class TestApplicationTokens {
UserGroupInformation currentUser = UserGroupInformation currentUser =
UserGroupInformation UserGroupInformation
.createRemoteUser(applicationAttemptId.toString()); .createRemoteUser(applicationAttemptId.toString());
String tokenURLEncodedStr = Credentials credentials = new Credentials();
containerManager.amContainerEnv DataInputByteBuffer buf = new DataInputByteBuffer();
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); containerManager.amTokens.rewind();
LOG.info("AppMasterToken is " + tokenURLEncodedStr); buf.reset(containerManager.amTokens);
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>(); credentials.readTokenStorageStream(buf);
token.decodeFromUrlString(tokenURLEncodedStr); currentUser.addCredentials(credentials);
currentUser.addToken(token);
rmClient = createRMClient(rm, conf, rpc, currentUser); rmClient = createRMClient(rm, conf, rpc, currentUser);
@ -152,9 +160,9 @@ public class TestApplicationTokens {
@Test @Test
public void testMasterKeyRollOver() throws Exception { public void testMasterKeyRollOver() throws Exception {
Configuration config = new Configuration();
MyContainerManager containerManager = new MyContainerManager(); MyContainerManager containerManager = new MyContainerManager();
final MockRM rm = new MockRMWithAMS(config, containerManager); final MockRM rm =
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
rm.start(); rm.start();
final Configuration conf = rm.getConfig(); final Configuration conf = rm.getConfig();
@ -169,11 +177,11 @@ public class TestApplicationTokens {
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
int waitCount = 0; int waitCount = 0;
while (containerManager.amContainerEnv == null && waitCount++ < 20) { while (containerManager.amTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen.."); LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000); Thread.sleep(1000);
} }
Assert.assertNotNull(containerManager.amContainerEnv); Assert.assertNotNull(containerManager.amTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt(); RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -182,13 +190,12 @@ public class TestApplicationTokens {
UserGroupInformation currentUser = UserGroupInformation currentUser =
UserGroupInformation UserGroupInformation
.createRemoteUser(applicationAttemptId.toString()); .createRemoteUser(applicationAttemptId.toString());
String tokenURLEncodedStr = Credentials credentials = new Credentials();
containerManager.amContainerEnv DataInputByteBuffer buf = new DataInputByteBuffer();
.get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); containerManager.amTokens.rewind();
LOG.info("AppMasterToken is " + tokenURLEncodedStr); buf.reset(containerManager.amTokens);
Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>(); credentials.readTokenStorageStream(buf);
token.decodeFromUrlString(tokenURLEncodedStr); currentUser.addCredentials(credentials);
currentUser.addToken(token);
rmClient = createRMClient(rm, conf, rpc, currentUser); rmClient = createRMClient(rm, conf, rpc, currentUser);