YARN-701. Use application tokens irrespective of secure or non-secure mode. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1504604 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-07-18 19:03:48 +00:00
parent 7ec67c5118
commit 32bc200d54
15 changed files with 367 additions and 151 deletions

View File

@ -71,6 +71,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -1392,6 +1395,18 @@ public class TestRMContainerAllocator {
@Override
protected void register() {
ApplicationAttemptId attemptId = getContext().getApplicationAttemptId();
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
try {
ugi.addTokenIdentifier(token.decodeIdentifier());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
UserGroupInformation.setLoginUser(ugi);
super.register();
}

View File

@ -250,6 +250,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-727. ClientRMProtocol.getAllApplications should accept ApplicationType as
a parameter. (Xuan Gong via hitesh)
YARN-701. Use application tokens irrespective of secure or non-secure
mode. (vinodkv via acmurthy)
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -40,7 +40,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestUnmanagedAMLauncher {
/**
private static final Log LOG = LogFactory
.getLog(TestUnmanagedAMLauncher.class);
@ -185,5 +185,5 @@ public class TestUnmanagedAMLauncher {
// Expected
}
}
*/
}

View File

@ -24,7 +24,10 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
@ -33,11 +36,13 @@ import java.util.TreeSet;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -48,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -57,12 +63,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.AfterClass;
@ -71,6 +80,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mortbay.log.Log;
public class TestAMRMClient {
static Configuration conf = null;
@ -130,11 +140,14 @@ public class TestAMRMClient {
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue("default");
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
ContainerLaunchContext amContainer =
BuilderUtils.newContainerLaunchContext(
Collections.<String, LocalResource> emptyMap(),
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
new HashMap<String, ByteBuffer>(), null,
new HashMap<ApplicationAccessType, String>());
appContext.setAMContainerSpec(amContainer);
// unmanaged AM
appContext.setUnmanagedAM(true);
appContext.setResource(Resource.newInstance(1024, 1));
// Create the request to send to the applications manager
SubmitApplicationRequest appRequest = Records
.newRecord(SubmitApplicationRequest.class);
@ -143,17 +156,32 @@ public class TestAMRMClient {
yarnClient.submitApplication(appContext);
// wait for app to start
RMAppAttempt appAttempt = null;
while (true) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
attemptId = appReport.getCurrentApplicationAttemptId();
appAttempt =
yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
}
// Just dig into the ResourceManager and get the AMRMToken just for the sake
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
}
@After
public void cancelApp() {
public void cancelApp() throws YarnException, IOException {
yarnClient.killApplication(attemptId.getApplicationId());
attemptId = null;
}
@ -403,6 +431,7 @@ public class TestAMRMClient {
int iterationsLeft = 3;
while (allocatedContainerCount < 2
&& iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);

View File

@ -33,6 +33,7 @@ import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@ -53,13 +54,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
@ -122,11 +125,20 @@ public class TestNMClient {
// wait for app to start
int iterationsLeft = 30;
RMAppAttempt appAttempt = null;
while (iterationsLeft > 0) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
attemptId = appReport.getCurrentApplicationAttemptId();
appAttempt =
yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
sleep(1000);
@ -136,6 +148,12 @@ public class TestNMClient {
fail("Application hasn't bee started");
}
// Just dig into the ResourceManager and get the AMRMToken just for the sake
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
// start am rm client
rmClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@ -103,7 +105,6 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler;
this.resync.setAMCommand(AMCommand.AM_RESYNC);
// this.reboot.containers = new ArrayList<Container>();
this.rmContext = rmContext;
}
@ -117,10 +118,17 @@ public class ApplicationMasterService extends AbstractService implements
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
Configuration serverConf = conf;
if (!UserGroupInformation.isSecurityEnabled()) {
// If the auth is not-simple, enforce it to be token-based.
serverConf = new Configuration(conf);
serverConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
UserGroupInformation.AuthenticationMethod.TOKEN.toString());
}
this.server =
rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
conf, this.rmContext.getAMRMTokenSecretManager(),
conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
serverConf, this.rmContext.getAMRMTokenSecretManager(),
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// Enable service authorization?
@ -142,13 +150,26 @@ public class ApplicationMasterService extends AbstractService implements
return this.bindAddress;
}
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
// currently sets only the required id, but iterate through anyways just to be
// sure.
private AMRMTokenIdentifier selectAMRMTokenIdentifier(
UserGroupInformation remoteUgi) throws IOException {
AMRMTokenIdentifier result = null;
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
for (TokenIdentifier tokenId : tokenIds) {
if (tokenId instanceof AMRMTokenIdentifier) {
result = (AMRMTokenIdentifier) tokenId;
break;
}
}
return result;
}
private void authorizeRequest(ApplicationAttemptId appAttemptID)
throws YarnException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
String appAttemptIDStr = appAttemptID.toString();
UserGroupInformation remoteUgi;
@ -162,9 +183,33 @@ public class ApplicationMasterService extends AbstractService implements
throw RPCUtil.getRemoteException(msg);
}
if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
boolean tokenFound = false;
String message = "";
AMRMTokenIdentifier appTokenIdentifier = null;
try {
appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
if (appTokenIdentifier == null) {
tokenFound = false;
message = "No AMRMToken found for " + appAttemptIDStr;
} else {
tokenFound = true;
}
} catch (IOException e) {
tokenFound = false;
message =
"Got exception while looking for AMRMToken for " + appAttemptIDStr;
}
if (!tokenFound) {
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
ApplicationAttemptId remoteApplicationAttemptId =
appTokenIdentifier.getApplicationAttemptId();
if (!remoteApplicationAttemptId.equals(appAttemptID)) {
String msg = "Unauthorized request from ApplicationMaster. "
+ "Expected ApplicationAttemptID: " + remoteUgi.getUserName()
+ "Expected ApplicationAttemptID: " + remoteApplicationAttemptId
+ " Found: " + appAttemptIDStr;
LOG.warn(msg);
throw RPCUtil.getRemoteException(msg);

View File

@ -57,7 +57,7 @@ public class RMContextImpl implements RMContext {
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer tokenRenewer;
private final AMRMTokenSecretManager appTokenSecretManager;
private final AMRMTokenSecretManager amRMTokenSecretManager;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
@ -68,7 +68,7 @@ public class RMContextImpl implements RMContext {
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer tokenRenewer,
AMRMTokenSecretManager appTokenSecretManager,
AMRMTokenSecretManager amRMTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
@ -78,7 +78,7 @@ public class RMContextImpl implements RMContext {
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
this.tokenRenewer = tokenRenewer;
this.appTokenSecretManager = appTokenSecretManager;
this.amRMTokenSecretManager = amRMTokenSecretManager;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
@ -156,7 +156,7 @@ public class RMContextImpl implements RMContext {
@Override
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
return this.appTokenSecretManager;
return this.amRMTokenSecretManager;
}
@Override

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -193,30 +192,28 @@ public class AMLauncher implements Runnable {
environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
String.valueOf(rmContext.getRMApps().get(
applicationId).getMaxAppAttempts()));
Credentials credentials = new Credentials();
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
if (container.getTokens() != null) {
// TODO: Don't do this kind of checks everywhere.
dibb.reset(container.getTokens());
credentials.readTokenStorageStream(dibb);
}
// Add application token
Token<AMRMTokenIdentifier> amrmToken =
application.getAMRMToken();
if(amrmToken != null) {
credentials.addToken(amrmToken.getService(), amrmToken);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
container.setTokens(ByteBuffer.wrap(dob.getData(), 0,
dob.getLength()));
}
// Add AMRMToken
Token<AMRMTokenIdentifier> amrmToken = application.getAMRMToken();
if (amrmToken != null) {
credentials.addToken(amrmToken.getService(), amrmToken);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
}
@SuppressWarnings("unchecked")

View File

@ -682,23 +682,24 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.clientToAMToken =
clientToAMTokenSelector.selectToken(new Text(),
appAttemptTokens.getAllTokens());
InetSocketAddress serviceAddr = conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
AMRMTokenSelector appTokenSelector = new AMRMTokenSelector();
this.amrmToken =
appTokenSelector.selectToken(
SecurityUtil.buildTokenService(serviceAddr),
appAttemptTokens.getAllTokens());
// For now, no need to populate tokens back to
// AMRMTokenSecretManager, because running attempts are rebooted
// Later in work-preserve restart, we'll create NEW->RUNNING transition
// in which the restored tokens will be added to the secret manager
}
InetSocketAddress serviceAddr =
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
AMRMTokenSelector appTokenSelector = new AMRMTokenSelector();
this.amrmToken =
appTokenSelector.selectToken(
SecurityUtil.buildTokenService(serviceAddr),
appAttemptTokens.getAllTokens());
// For now, no need to populate tokens back to AMRMTokenSecretManager,
// because running attempts are rebooted. Later in work-preserve restart,
// we'll create NEW->RUNNING transition in which the restored tokens will be
// added to the secret manager
}
private static class BaseTransition implements
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
@ -730,26 +731,24 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new Token<ClientToAMTokenIdentifier>(new ClientToAMTokenIdentifier(
appAttempt.applicationAttemptId),
appAttempt.rmContext.getClientToAMTokenSecretManager());
// create application token
AMRMTokenIdentifier id =
new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
Token<AMRMTokenIdentifier> amRmToken =
new Token<AMRMTokenIdentifier>(id,
appAttempt.rmContext.getAMRMTokenSecretManager());
InetSocketAddress serviceAddr =
appAttempt.conf.getSocketAddr(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the
// token, but this token is directly provided to the AMs
SecurityUtil.setTokenService(amRmToken, serviceAddr);
appAttempt.amrmToken = amRmToken;
}
// create AMRMToken
AMRMTokenIdentifier id =
new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
Token<AMRMTokenIdentifier> amRmToken =
new Token<AMRMTokenIdentifier>(id,
appAttempt.rmContext.getAMRMTokenSecretManager());
InetSocketAddress serviceAddr =
appAttempt.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
// normally the client should set the service after acquiring the
// token, but this token is directly provided to the AMs
SecurityUtil.setTokenService(amRmToken, serviceAddr);
appAttempt.amrmToken = amRmToken;
// Add the application to the scheduler
appAttempt.eventHandler.handle(
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,

View File

@ -18,11 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -89,13 +94,29 @@ public class MockAM {
waitForState(RMAppAttemptState.LAUNCHED);
}
responseId = 0;
RegisterApplicationMasterRequest req =
final RegisterApplicationMasterRequest req =
Records.newRecord(RegisterApplicationMasterRequest.class);
req.setApplicationAttemptId(attemptId);
req.setHost("");
req.setRpcPort(1);
req.setTrackingUrl("");
return amRMProtocol.registerApplicationMaster(req);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
context.getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
try {
return ugi
.doAs(new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
@Override
public RegisterApplicationMasterResponse run() throws Exception {
return amRMProtocol.registerApplicationMaster(req);
}
});
} catch (UndeclaredThrowableException e) {
throw (Exception) e.getCause();
}
}
public void addRequests(String[] hosts, int memory, int priority,
@ -153,18 +174,46 @@ public class MockAM {
public AllocateResponse allocate(
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception {
AllocateRequest req = AllocateRequest.newInstance(attemptId,
final AllocateRequest req = AllocateRequest.newInstance(attemptId,
++responseId, 0F, resourceRequest, releases, null);
return amRMProtocol.allocate(req);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
context.getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
try {
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
return amRMProtocol.allocate(req);
}
});
} catch (UndeclaredThrowableException e) {
throw (Exception) e.getCause();
}
}
public void unregisterAppAttempt() throws Exception {
waitForState(RMAppAttemptState.RUNNING);
FinishApplicationMasterRequest req = Records.newRecord(FinishApplicationMasterRequest.class);
final FinishApplicationMasterRequest req =
Records.newRecord(FinishApplicationMasterRequest.class);
req.setAppAttemptId(attemptId);
req.setDiagnostics("");
req.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
req.setTrackingUrl("");
amRMProtocol.finishApplicationMaster(req);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
context.getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
amRMProtocol.finishApplicationMaster(req);
return null;
}
});
}
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -54,22 +56,35 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestAMAuthorization {
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
private static final Configuration confWithSecurityEnabled =
new Configuration();
static {
confWithSecurityEnabled.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(confWithSecurityEnabled);
private final Configuration conf;
@Parameters
public static Collection<Object[]> configs() {
Configuration conf = new Configuration();
Configuration confWithSecurity = new Configuration();
confWithSecurity.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
UserGroupInformation.AuthenticationMethod.KERBEROS.toString());
return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity} });
}
public TestAMAuthorization(Configuration conf) {
this.conf = conf;
UserGroupInformation.setConfiguration(conf);
}
public static final class MyContainerManager implements ContainerManagementProtocol {
public ByteBuffer amTokens;
public ByteBuffer containerTokens;
public MyContainerManager() {
}
@ -78,23 +93,30 @@ public class TestAMAuthorization {
public StartContainerResponse
startContainer(StartContainerRequest request)
throws YarnException {
amTokens = request.getContainerLaunchContext().getTokens();
containerTokens = request.getContainerLaunchContext().getTokens();
return null;
}
@Override
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnException {
// TODO Auto-generated method stub
return null;
}
public Credentials getContainerCredentials() throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
containerTokens.rewind();
buf.reset(containerTokens);
credentials.readTokenStorageStream(buf);
return credentials;
}
}
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
@ -118,7 +140,7 @@ public class TestAMAuthorization {
public void testAuthorizedAccess() throws Exception {
MyContainerManager containerManager = new MyContainerManager();
final MockRM rm =
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
new MockRMWithAMS(conf, containerManager);
rm.start();
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@ -131,11 +153,11 @@ public class TestAMAuthorization {
nm1.nodeHeartbeat(true);
int waitCount = 0;
while (containerManager.amTokens == null && waitCount++ < 20) {
while (containerManager.containerTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000);
}
Assert.assertNotNull(containerManager.amTokens);
Assert.assertNotNull(containerManager.containerTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -147,11 +169,7 @@ public class TestAMAuthorization {
UserGroupInformation currentUser = UserGroupInformation
.createRemoteUser(applicationAttemptId.toString());
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
containerManager.amTokens.rewind();
buf.reset(containerManager.amTokens);
credentials.readTokenStorageStream(buf);
Credentials credentials = containerManager.getContainerCredentials();
currentUser.addCredentials(credentials);
ApplicationMasterProtocol client = currentUser
@ -169,8 +187,10 @@ public class TestAMAuthorization {
RegisterApplicationMasterResponse response =
client.registerApplicationMaster(request);
Assert.assertNotNull(response.getClientToAMTokenMasterKey());
Assert
if (UserGroupInformation.isSecurityEnabled()) {
Assert
.assertTrue(response.getClientToAMTokenMasterKey().array().length > 0);
}
Assert.assertEquals("Register response has bad ACLs", "*",
response.getApplicationACLs().get(ApplicationAccessType.VIEW_APP));
@ -180,7 +200,7 @@ public class TestAMAuthorization {
@Test
public void testUnauthorizedAccess() throws Exception {
MyContainerManager containerManager = new MyContainerManager();
MockRM rm = new MockRMWithAMS(confWithSecurityEnabled, containerManager);
MockRM rm = new MockRMWithAMS(conf, containerManager);
rm.start();
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@ -190,11 +210,11 @@ public class TestAMAuthorization {
nm1.nodeHeartbeat(true);
int waitCount = 0;
while (containerManager.amTokens == null && waitCount++ < 40) {
while (containerManager.containerTokens == null && waitCount++ < 40) {
LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000);
}
Assert.assertNotNull(containerManager.amTokens);
Assert.assertNotNull(containerManager.containerTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -229,17 +249,19 @@ public class TestAMAuthorization {
} catch (Exception e) {
// Because there are no tokens, the request should be rejected as the
// server side will assume we are trying simple auth.
String availableAuthMethods;
if (UserGroupInformation.isSecurityEnabled()) {
availableAuthMethods = "[TOKEN, KERBEROS]";
} else {
availableAuthMethods = "[TOKEN]";
}
Assert.assertTrue(e.getCause().getMessage().contains(
"SIMPLE authentication is not enabled. "
+ "Available:[TOKEN, KERBEROS]"));
+ "Available:" + availableAuthMethods));
}
// Now try to validate invalid authorization.
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
containerManager.amTokens.rewind();
buf.reset(containerManager.amTokens);
credentials.readTokenStorageStream(buf);
Credentials credentials = containerManager.getContainerCredentials();
currentUser.addCredentials(credentials);
// Create a client to the RM.
@ -252,7 +274,8 @@ public class TestAMAuthorization {
}
});
request = Records.newRecord(RegisterApplicationMasterRequest.class);
request =
Records.newRecord(RegisterApplicationMasterRequest.class);
ApplicationAttemptId otherAppAttemptId = BuilderUtils
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
request.setApplicationAttemptId(otherAppAttemptId);

View File

@ -18,17 +18,22 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -87,6 +92,22 @@ public class TestAMRMRPCNodeUpdates {
dispatcher.await();
}
private AllocateResponse allocate(final AllocateRequest req) throws Exception {
ApplicationAttemptId attemptId = req.getApplicationAttemptId();
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
Token<AMRMTokenIdentifier> token =
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
return amService.allocate(req);
}
});
}
@Test
public void testAMRMUnusableNodes() throws Exception {
@ -109,7 +130,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), 0, 0F, null, null, null);
AllocateResponse response1 = amService.allocate(allocateRequest1);
AllocateResponse response1 = allocate(allocateRequest1);
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -118,7 +139,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns updated node
allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
response1 = amService.allocate(allocateRequest1);
response1 = allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
NodeReport nr = updatedNodes.iterator().next();
@ -126,7 +147,7 @@ public class TestAMRMRPCNodeUpdates {
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
// resending the allocate request returns the same result
response1 = amService.allocate(allocateRequest1);
response1 = allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -138,7 +159,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate request returns delta
allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
response1 = amService.allocate(allocateRequest1);
response1 = allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -158,7 +179,7 @@ public class TestAMRMRPCNodeUpdates {
// allocate request returns no updated node
AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2
.getAppAttemptId(), 0, 0F, null, null, null);
AllocateResponse response2 = amService.allocate(allocateRequest2);
AllocateResponse response2 = allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());
@ -167,7 +188,7 @@ public class TestAMRMRPCNodeUpdates {
// both AM's should get delta updated nodes
allocateRequest1 = AllocateRequest.newInstance(attempt1
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
response1 = amService.allocate(allocateRequest1);
response1 = allocate(allocateRequest1);
updatedNodes = response1.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -176,7 +197,7 @@ public class TestAMRMRPCNodeUpdates {
allocateRequest2 = AllocateRequest.newInstance(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
response2 = amService.allocate(allocateRequest2);
response2 = allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(1, updatedNodes.size());
nr = updatedNodes.iterator().next();
@ -186,7 +207,7 @@ public class TestAMRMRPCNodeUpdates {
// subsequent allocate calls should return no updated nodes
allocateRequest2 = AllocateRequest.newInstance(attempt2
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
response2 = amService.allocate(allocateRequest2);
response2 = allocate(allocateRequest2);
updatedNodes = response2.getUpdatedNodes();
Assert.assertEquals(0, updatedNodes.size());

View File

@ -18,19 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction;
import junit.framework.Assert;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.junit.After;
@ -39,20 +40,13 @@ import org.junit.Test;
public class TestAMRMRPCResponseId {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private MockRM rm;
ApplicationMasterService amService = null;
private ClientRMService clientService;
private RMContext context;
@Before
public void setUp() {
this.rm = new MockRM();
rm.start();
this.clientService = rm.getClientRMService();
amService = rm.getApplicationMasterService();
}
@ -63,6 +57,22 @@ public class TestAMRMRPCResponseId {
}
}
private AllocateResponse allocate(final AllocateRequest req) throws Exception {
ApplicationAttemptId attemptId = req.getApplicationAttemptId();
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(attemptId.toString());
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
.getRMAppAttempt(attemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
return amService.allocate(req);
}
});
}
@Test
public void testARRMResponseId() throws Exception {
@ -81,22 +91,22 @@ public class TestAMRMRPCResponseId {
AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null, null);
AllocateResponse response = amService.allocate(allocateRequest);
AllocateResponse response = allocate(allocateRequest);
Assert.assertEquals(1, response.getResponseId());
Assert.assertTrue(response.getAMCommand() == null);
allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
response = amService.allocate(allocateRequest);
response = allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
/* try resending */
response = amService.allocate(allocateRequest);
response = allocate(allocateRequest);
Assert.assertEquals(2, response.getResponseId());
/** try sending old request again **/
allocateRequest = AllocateRequest.newInstance(attempt
.getAppAttemptId(), 0, 0F, null, null, null);
response = amService.allocate(allocateRequest);
response = allocate(allocateRequest);
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
}
}

View File

@ -30,6 +30,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -296,7 +297,8 @@ public class TestSchedulerUtils {
UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
Credentials credentials = containerManager.getContainerCredentials();
currentUser.addCredentials(credentials);
ApplicationMasterProtocol client = currentUser
.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import javax.crypto.SecretKey;
@ -26,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -46,17 +47,29 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestAMRMTokens {
private static final Log LOG = LogFactory.getLog(TestAMRMTokens.class);
private static final Configuration confWithSecurityEnabled =
new Configuration();
static {
confWithSecurityEnabled.set(
private final Configuration conf;
@Parameters
public static Collection<Object[]> configs() {
Configuration conf = new Configuration();
Configuration confWithSecurity = new Configuration();
confWithSecurity.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(confWithSecurityEnabled);
return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity } });
}
public TestAMRMTokens(Configuration conf) {
this.conf = conf;
UserGroupInformation.setConfiguration(conf);
}
/**
@ -70,7 +83,7 @@ public class TestAMRMTokens {
MyContainerManager containerManager = new MyContainerManager();
final MockRM rm =
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
new MockRMWithAMS(conf, containerManager);
rm.start();
final Configuration conf = rm.getConfig();
@ -85,11 +98,11 @@ public class TestAMRMTokens {
nm1.nodeHeartbeat(true);
int waitCount = 0;
while (containerManager.amTokens == null && waitCount++ < 20) {
while (containerManager.containerTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000);
}
Assert.assertNotNull(containerManager.amTokens);
Assert.assertNotNull(containerManager.containerTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -98,11 +111,7 @@ public class TestAMRMTokens {
UserGroupInformation currentUser =
UserGroupInformation
.createRemoteUser(applicationAttemptId.toString());
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
containerManager.amTokens.rewind();
buf.reset(containerManager.amTokens);
credentials.readTokenStorageStream(buf);
Credentials credentials = containerManager.getContainerCredentials();
currentUser.addCredentials(credentials);
rmClient = createRMClient(rm, conf, rpc, currentUser);
@ -162,7 +171,7 @@ public class TestAMRMTokens {
MyContainerManager containerManager = new MyContainerManager();
final MockRM rm =
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
new MockRMWithAMS(conf, containerManager);
rm.start();
final Configuration conf = rm.getConfig();
@ -177,11 +186,11 @@ public class TestAMRMTokens {
nm1.nodeHeartbeat(true);
int waitCount = 0;
while (containerManager.amTokens == null && waitCount++ < 20) {
while (containerManager.containerTokens == null && waitCount++ < 20) {
LOG.info("Waiting for AM Launch to happen..");
Thread.sleep(1000);
}
Assert.assertNotNull(containerManager.amTokens);
Assert.assertNotNull(containerManager.containerTokens);
RMAppAttempt attempt = app.getCurrentAppAttempt();
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@ -190,11 +199,7 @@ public class TestAMRMTokens {
UserGroupInformation currentUser =
UserGroupInformation
.createRemoteUser(applicationAttemptId.toString());
Credentials credentials = new Credentials();
DataInputByteBuffer buf = new DataInputByteBuffer();
containerManager.amTokens.rewind();
buf.reset(containerManager.amTokens);
credentials.readTokenStorageStream(buf);
Credentials credentials = containerManager.getContainerCredentials();
currentUser.addCredentials(credentials);
rmClient = createRMClient(rm, conf, rpc, currentUser);