Merge -c from trunk to branch-2 to fix YARN-701. Use application tokens irrespective of secure or non-secure mode. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1504605 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f736ef62b2
commit
d82a3cd227
|
@ -71,6 +71,8 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -1392,6 +1395,18 @@ public class TestRMContainerAllocator {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void register() {
|
protected void register() {
|
||||||
|
ApplicationAttemptId attemptId = getContext().getApplicationAttemptId();
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
|
Token<AMRMTokenIdentifier> token =
|
||||||
|
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
|
||||||
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
try {
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new YarnRuntimeException(e);
|
||||||
|
}
|
||||||
|
UserGroupInformation.setLoginUser(ugi);
|
||||||
super.register();
|
super.register();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -233,6 +233,9 @@ Release 2.1.0-beta - 2013-07-02
|
||||||
YARN-727. ClientRMProtocol.getAllApplications should accept ApplicationType as
|
YARN-727. ClientRMProtocol.getAllApplications should accept ApplicationType as
|
||||||
a parameter. (Xuan Gong via hitesh)
|
a parameter. (Xuan Gong via hitesh)
|
||||||
|
|
||||||
|
YARN-701. Use application tokens irrespective of secure or non-secure
|
||||||
|
mode. (vinodkv via acmurthy)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||||
|
|
|
@ -40,7 +40,7 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestUnmanagedAMLauncher {
|
public class TestUnmanagedAMLauncher {
|
||||||
|
/**
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(TestUnmanagedAMLauncher.class);
|
.getLog(TestUnmanagedAMLauncher.class);
|
||||||
|
|
||||||
|
@ -185,5 +185,5 @@ public class TestUnmanagedAMLauncher {
|
||||||
// Expected
|
// Expected
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,10 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -33,11 +36,13 @@ import java.util.TreeSet;
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
@ -48,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
@ -57,12 +63,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -71,6 +80,7 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
import org.mortbay.log.Log;
|
||||||
|
|
||||||
public class TestAMRMClient {
|
public class TestAMRMClient {
|
||||||
static Configuration conf = null;
|
static Configuration conf = null;
|
||||||
|
@ -130,11 +140,14 @@ public class TestAMRMClient {
|
||||||
// Set the queue to which this application is to be submitted in the RM
|
// Set the queue to which this application is to be submitted in the RM
|
||||||
appContext.setQueue("default");
|
appContext.setQueue("default");
|
||||||
// Set up the container launch context for the application master
|
// Set up the container launch context for the application master
|
||||||
ContainerLaunchContext amContainer = Records
|
ContainerLaunchContext amContainer =
|
||||||
.newRecord(ContainerLaunchContext.class);
|
BuilderUtils.newContainerLaunchContext(
|
||||||
|
Collections.<String, LocalResource> emptyMap(),
|
||||||
|
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
||||||
|
new HashMap<String, ByteBuffer>(), null,
|
||||||
|
new HashMap<ApplicationAccessType, String>());
|
||||||
appContext.setAMContainerSpec(amContainer);
|
appContext.setAMContainerSpec(amContainer);
|
||||||
// unmanaged AM
|
appContext.setResource(Resource.newInstance(1024, 1));
|
||||||
appContext.setUnmanagedAM(true);
|
|
||||||
// Create the request to send to the applications manager
|
// Create the request to send to the applications manager
|
||||||
SubmitApplicationRequest appRequest = Records
|
SubmitApplicationRequest appRequest = Records
|
||||||
.newRecord(SubmitApplicationRequest.class);
|
.newRecord(SubmitApplicationRequest.class);
|
||||||
|
@ -143,17 +156,32 @@ public class TestAMRMClient {
|
||||||
yarnClient.submitApplication(appContext);
|
yarnClient.submitApplication(appContext);
|
||||||
|
|
||||||
// wait for app to start
|
// wait for app to start
|
||||||
|
RMAppAttempt appAttempt = null;
|
||||||
while (true) {
|
while (true) {
|
||||||
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||||
if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
|
if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
|
||||||
attemptId = appReport.getCurrentApplicationAttemptId();
|
attemptId = appReport.getCurrentApplicationAttemptId();
|
||||||
|
appAttempt =
|
||||||
|
yarnCluster.getResourceManager().getRMContext().getRMApps()
|
||||||
|
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
while (true) {
|
||||||
|
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Just dig into the ResourceManager and get the AMRMToken just for the sake
|
||||||
|
// of testing.
|
||||||
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
|
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||||
|
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void cancelApp() {
|
public void cancelApp() throws YarnException, IOException {
|
||||||
|
yarnClient.killApplication(attemptId.getApplicationId());
|
||||||
attemptId = null;
|
attemptId = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,6 +431,7 @@ public class TestAMRMClient {
|
||||||
int iterationsLeft = 3;
|
int iterationsLeft = 3;
|
||||||
while (allocatedContainerCount < 2
|
while (allocatedContainerCount < 2
|
||||||
&& iterationsLeft-- > 0) {
|
&& iterationsLeft-- > 0) {
|
||||||
|
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
|
||||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||||
assertTrue(amClient.ask.size() == 0);
|
assertTrue(amClient.ask.size() == 0);
|
||||||
assertTrue(amClient.release.size() == 0);
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.TreeSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
|
@ -53,13 +54,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -122,11 +125,20 @@ public class TestNMClient {
|
||||||
|
|
||||||
// wait for app to start
|
// wait for app to start
|
||||||
int iterationsLeft = 30;
|
int iterationsLeft = 30;
|
||||||
|
RMAppAttempt appAttempt = null;
|
||||||
while (iterationsLeft > 0) {
|
while (iterationsLeft > 0) {
|
||||||
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
|
||||||
if (appReport.getYarnApplicationState() ==
|
if (appReport.getYarnApplicationState() ==
|
||||||
YarnApplicationState.ACCEPTED) {
|
YarnApplicationState.ACCEPTED) {
|
||||||
attemptId = appReport.getCurrentApplicationAttemptId();
|
attemptId = appReport.getCurrentApplicationAttemptId();
|
||||||
|
appAttempt =
|
||||||
|
yarnCluster.getResourceManager().getRMContext().getRMApps()
|
||||||
|
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
||||||
|
while (true) {
|
||||||
|
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
sleep(1000);
|
sleep(1000);
|
||||||
|
@ -136,6 +148,12 @@ public class TestNMClient {
|
||||||
fail("Application hasn't bee started");
|
fail("Application hasn't bee started");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Just dig into the ResourceManager and get the AMRMToken just for the sake
|
||||||
|
// of testing.
|
||||||
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
|
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||||
|
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
||||||
|
|
||||||
// start am rm client
|
// start am rm client
|
||||||
rmClient =
|
rmClient =
|
||||||
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
|
@ -103,7 +105,6 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||||
this.rScheduler = scheduler;
|
this.rScheduler = scheduler;
|
||||||
this.resync.setAMCommand(AMCommand.AM_RESYNC);
|
this.resync.setAMCommand(AMCommand.AM_RESYNC);
|
||||||
// this.reboot.containers = new ArrayList<Container>();
|
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,10 +118,17 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
|
|
||||||
|
Configuration serverConf = conf;
|
||||||
|
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
// If the auth is not-simple, enforce it to be token-based.
|
||||||
|
serverConf = new Configuration(conf);
|
||||||
|
serverConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
UserGroupInformation.AuthenticationMethod.TOKEN.toString());
|
||||||
|
}
|
||||||
this.server =
|
this.server =
|
||||||
rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
|
rpc.getServer(ApplicationMasterProtocol.class, this, masterServiceAddress,
|
||||||
conf, this.rmContext.getAMRMTokenSecretManager(),
|
serverConf, this.rmContext.getAMRMTokenSecretManager(),
|
||||||
conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
|
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
|
||||||
|
|
||||||
// Enable service authorization?
|
// Enable service authorization?
|
||||||
|
@ -142,13 +150,26 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
return this.bindAddress;
|
return this.bindAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
|
||||||
|
// currently sets only the required id, but iterate through anyways just to be
|
||||||
|
// sure.
|
||||||
|
private AMRMTokenIdentifier selectAMRMTokenIdentifier(
|
||||||
|
UserGroupInformation remoteUgi) throws IOException {
|
||||||
|
AMRMTokenIdentifier result = null;
|
||||||
|
Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
|
||||||
|
for (TokenIdentifier tokenId : tokenIds) {
|
||||||
|
if (tokenId instanceof AMRMTokenIdentifier) {
|
||||||
|
result = (AMRMTokenIdentifier) tokenId;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
private void authorizeRequest(ApplicationAttemptId appAttemptID)
|
private void authorizeRequest(ApplicationAttemptId appAttemptID)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
|
|
||||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String appAttemptIDStr = appAttemptID.toString();
|
String appAttemptIDStr = appAttemptID.toString();
|
||||||
|
|
||||||
UserGroupInformation remoteUgi;
|
UserGroupInformation remoteUgi;
|
||||||
|
@ -162,9 +183,33 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
throw RPCUtil.getRemoteException(msg);
|
throw RPCUtil.getRemoteException(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!remoteUgi.getUserName().equals(appAttemptIDStr)) {
|
boolean tokenFound = false;
|
||||||
|
String message = "";
|
||||||
|
AMRMTokenIdentifier appTokenIdentifier = null;
|
||||||
|
try {
|
||||||
|
appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
|
||||||
|
if (appTokenIdentifier == null) {
|
||||||
|
tokenFound = false;
|
||||||
|
message = "No AMRMToken found for " + appAttemptIDStr;
|
||||||
|
} else {
|
||||||
|
tokenFound = true;
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
tokenFound = false;
|
||||||
|
message =
|
||||||
|
"Got exception while looking for AMRMToken for " + appAttemptIDStr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!tokenFound) {
|
||||||
|
LOG.warn(message);
|
||||||
|
throw RPCUtil.getRemoteException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationAttemptId remoteApplicationAttemptId =
|
||||||
|
appTokenIdentifier.getApplicationAttemptId();
|
||||||
|
if (!remoteApplicationAttemptId.equals(appAttemptID)) {
|
||||||
String msg = "Unauthorized request from ApplicationMaster. "
|
String msg = "Unauthorized request from ApplicationMaster. "
|
||||||
+ "Expected ApplicationAttemptID: " + remoteUgi.getUserName()
|
+ "Expected ApplicationAttemptID: " + remoteApplicationAttemptId
|
||||||
+ " Found: " + appAttemptIDStr;
|
+ " Found: " + appAttemptIDStr;
|
||||||
LOG.warn(msg);
|
LOG.warn(msg);
|
||||||
throw RPCUtil.getRemoteException(msg);
|
throw RPCUtil.getRemoteException(msg);
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class RMContextImpl implements RMContext {
|
||||||
private RMStateStore stateStore = null;
|
private RMStateStore stateStore = null;
|
||||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||||
private final DelegationTokenRenewer tokenRenewer;
|
private final DelegationTokenRenewer tokenRenewer;
|
||||||
private final AMRMTokenSecretManager appTokenSecretManager;
|
private final AMRMTokenSecretManager amRMTokenSecretManager;
|
||||||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||||
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
||||||
|
@ -68,7 +68,7 @@ public class RMContextImpl implements RMContext {
|
||||||
AMLivelinessMonitor amLivelinessMonitor,
|
AMLivelinessMonitor amLivelinessMonitor,
|
||||||
AMLivelinessMonitor amFinishingMonitor,
|
AMLivelinessMonitor amFinishingMonitor,
|
||||||
DelegationTokenRenewer tokenRenewer,
|
DelegationTokenRenewer tokenRenewer,
|
||||||
AMRMTokenSecretManager appTokenSecretManager,
|
AMRMTokenSecretManager amRMTokenSecretManager,
|
||||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||||
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
||||||
|
@ -78,7 +78,7 @@ public class RMContextImpl implements RMContext {
|
||||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||||
this.amFinishingMonitor = amFinishingMonitor;
|
this.amFinishingMonitor = amFinishingMonitor;
|
||||||
this.tokenRenewer = tokenRenewer;
|
this.tokenRenewer = tokenRenewer;
|
||||||
this.appTokenSecretManager = appTokenSecretManager;
|
this.amRMTokenSecretManager = amRMTokenSecretManager;
|
||||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||||
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
|
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
|
||||||
|
@ -156,7 +156,7 @@ public class RMContextImpl implements RMContext {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
|
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
|
||||||
return this.appTokenSecretManager;
|
return this.amRMTokenSecretManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
|
@ -194,29 +193,27 @@ public class AMLauncher implements Runnable {
|
||||||
String.valueOf(rmContext.getRMApps().get(
|
String.valueOf(rmContext.getRMApps().get(
|
||||||
applicationId).getMaxAppAttempts()));
|
applicationId).getMaxAppAttempts()));
|
||||||
|
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
// TODO: Security enabled/disabled info should come from RM.
|
// TODO: Security enabled/disabled info should come from RM.
|
||||||
|
|
||||||
Credentials credentials = new Credentials();
|
|
||||||
|
|
||||||
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
DataInputByteBuffer dibb = new DataInputByteBuffer();
|
||||||
if (container.getTokens() != null) {
|
if (container.getTokens() != null) {
|
||||||
// TODO: Don't do this kind of checks everywhere.
|
// TODO: Don't do this kind of checks everywhere.
|
||||||
dibb.reset(container.getTokens());
|
dibb.reset(container.getTokens());
|
||||||
credentials.readTokenStorageStream(dibb);
|
credentials.readTokenStorageStream(dibb);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add application token
|
// Add AMRMToken
|
||||||
Token<AMRMTokenIdentifier> amrmToken =
|
Token<AMRMTokenIdentifier> amrmToken = application.getAMRMToken();
|
||||||
application.getAMRMToken();
|
|
||||||
if (amrmToken != null) {
|
if (amrmToken != null) {
|
||||||
credentials.addToken(amrmToken.getService(), amrmToken);
|
credentials.addToken(amrmToken.getService(), amrmToken);
|
||||||
}
|
}
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
credentials.writeTokenStorageToStream(dob);
|
credentials.writeTokenStorageToStream(dob);
|
||||||
container.setTokens(ByteBuffer.wrap(dob.getData(), 0,
|
container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
|
||||||
dob.getLength()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -682,9 +682,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
this.clientToAMToken =
|
this.clientToAMToken =
|
||||||
clientToAMTokenSelector.selectToken(new Text(),
|
clientToAMTokenSelector.selectToken(new Text(),
|
||||||
appAttemptTokens.getAllTokens());
|
appAttemptTokens.getAllTokens());
|
||||||
|
}
|
||||||
|
|
||||||
InetSocketAddress serviceAddr = conf.getSocketAddr(
|
InetSocketAddress serviceAddr =
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
AMRMTokenSelector appTokenSelector = new AMRMTokenSelector();
|
AMRMTokenSelector appTokenSelector = new AMRMTokenSelector();
|
||||||
|
@ -693,12 +694,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
SecurityUtil.buildTokenService(serviceAddr),
|
SecurityUtil.buildTokenService(serviceAddr),
|
||||||
appAttemptTokens.getAllTokens());
|
appAttemptTokens.getAllTokens());
|
||||||
|
|
||||||
// For now, no need to populate tokens back to
|
// For now, no need to populate tokens back to AMRMTokenSecretManager,
|
||||||
// AMRMTokenSecretManager, because running attempts are rebooted
|
// because running attempts are rebooted. Later in work-preserve restart,
|
||||||
// Later in work-preserve restart, we'll create NEW->RUNNING transition
|
// we'll create NEW->RUNNING transition in which the restored tokens will be
|
||||||
// in which the restored tokens will be added to the secret manager
|
// added to the secret manager
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BaseTransition implements
|
private static class BaseTransition implements
|
||||||
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
|
SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> {
|
||||||
|
|
||||||
|
@ -730,16 +731,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
new Token<ClientToAMTokenIdentifier>(new ClientToAMTokenIdentifier(
|
new Token<ClientToAMTokenIdentifier>(new ClientToAMTokenIdentifier(
|
||||||
appAttempt.applicationAttemptId),
|
appAttempt.applicationAttemptId),
|
||||||
appAttempt.rmContext.getClientToAMTokenSecretManager());
|
appAttempt.rmContext.getClientToAMTokenSecretManager());
|
||||||
|
}
|
||||||
|
|
||||||
// create application token
|
// create AMRMToken
|
||||||
AMRMTokenIdentifier id =
|
AMRMTokenIdentifier id =
|
||||||
new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
|
new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
|
||||||
Token<AMRMTokenIdentifier> amRmToken =
|
Token<AMRMTokenIdentifier> amRmToken =
|
||||||
new Token<AMRMTokenIdentifier>(id,
|
new Token<AMRMTokenIdentifier>(id,
|
||||||
appAttempt.rmContext.getAMRMTokenSecretManager());
|
appAttempt.rmContext.getAMRMTokenSecretManager());
|
||||||
InetSocketAddress serviceAddr =
|
InetSocketAddress serviceAddr =
|
||||||
appAttempt.conf.getSocketAddr(
|
appAttempt.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||||
// normally the client should set the service after acquiring the
|
// normally the client should set the service after acquiring the
|
||||||
|
@ -748,8 +749,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
|
|
||||||
appAttempt.amrmToken = amRmToken;
|
appAttempt.amrmToken = amRmToken;
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the application to the scheduler
|
// Add the application to the scheduler
|
||||||
appAttempt.eventHandler.handle(
|
appAttempt.eventHandler.handle(
|
||||||
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
||||||
|
|
|
@ -18,11 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -89,14 +94,30 @@ public class MockAM {
|
||||||
waitForState(RMAppAttemptState.LAUNCHED);
|
waitForState(RMAppAttemptState.LAUNCHED);
|
||||||
}
|
}
|
||||||
responseId = 0;
|
responseId = 0;
|
||||||
RegisterApplicationMasterRequest req =
|
final RegisterApplicationMasterRequest req =
|
||||||
Records.newRecord(RegisterApplicationMasterRequest.class);
|
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||||
req.setApplicationAttemptId(attemptId);
|
req.setApplicationAttemptId(attemptId);
|
||||||
req.setHost("");
|
req.setHost("");
|
||||||
req.setRpcPort(1);
|
req.setRpcPort(1);
|
||||||
req.setTrackingUrl("");
|
req.setTrackingUrl("");
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
|
Token<AMRMTokenIdentifier> token =
|
||||||
|
context.getRMApps().get(attemptId.getApplicationId())
|
||||||
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
try {
|
||||||
|
return ugi
|
||||||
|
.doAs(new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
|
||||||
|
@Override
|
||||||
|
public RegisterApplicationMasterResponse run() throws Exception {
|
||||||
return amRMProtocol.registerApplicationMaster(req);
|
return amRMProtocol.registerApplicationMaster(req);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
throw (Exception) e.getCause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void addRequests(String[] hosts, int memory, int priority,
|
public void addRequests(String[] hosts, int memory, int priority,
|
||||||
int containers) throws Exception {
|
int containers) throws Exception {
|
||||||
|
@ -153,18 +174,46 @@ public class MockAM {
|
||||||
public AllocateResponse allocate(
|
public AllocateResponse allocate(
|
||||||
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
|
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
AllocateRequest req = AllocateRequest.newInstance(attemptId,
|
final AllocateRequest req = AllocateRequest.newInstance(attemptId,
|
||||||
++responseId, 0F, resourceRequest, releases, null);
|
++responseId, 0F, resourceRequest, releases, null);
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
|
Token<AMRMTokenIdentifier> token =
|
||||||
|
context.getRMApps().get(attemptId.getApplicationId())
|
||||||
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
try {
|
||||||
|
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||||
|
@Override
|
||||||
|
public AllocateResponse run() throws Exception {
|
||||||
return amRMProtocol.allocate(req);
|
return amRMProtocol.allocate(req);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
throw (Exception) e.getCause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void unregisterAppAttempt() throws Exception {
|
public void unregisterAppAttempt() throws Exception {
|
||||||
waitForState(RMAppAttemptState.RUNNING);
|
waitForState(RMAppAttemptState.RUNNING);
|
||||||
FinishApplicationMasterRequest req = Records.newRecord(FinishApplicationMasterRequest.class);
|
final FinishApplicationMasterRequest req =
|
||||||
|
Records.newRecord(FinishApplicationMasterRequest.class);
|
||||||
req.setAppAttemptId(attemptId);
|
req.setAppAttemptId(attemptId);
|
||||||
req.setDiagnostics("");
|
req.setDiagnostics("");
|
||||||
req.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
req.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
||||||
req.setTrackingUrl("");
|
req.setTrackingUrl("");
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
|
Token<AMRMTokenIdentifier> token =
|
||||||
|
context.getRMApps().get(attemptId.getApplicationId())
|
||||||
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object run() throws Exception {
|
||||||
amRMProtocol.finishApplicationMaster(req);
|
amRMProtocol.finishApplicationMaster(req);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -54,22 +56,35 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestAMAuthorization {
|
public class TestAMAuthorization {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
|
private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
|
||||||
|
|
||||||
private static final Configuration confWithSecurityEnabled =
|
private final Configuration conf;
|
||||||
new Configuration();
|
|
||||||
static {
|
@Parameters
|
||||||
confWithSecurityEnabled.set(
|
public static Collection<Object[]> configs() {
|
||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
Configuration conf = new Configuration();
|
||||||
UserGroupInformation.setConfiguration(confWithSecurityEnabled);
|
Configuration confWithSecurity = new Configuration();
|
||||||
|
confWithSecurity.set(
|
||||||
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
UserGroupInformation.AuthenticationMethod.KERBEROS.toString());
|
||||||
|
return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity} });
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestAMAuthorization(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class MyContainerManager implements ContainerManagementProtocol {
|
public static final class MyContainerManager implements ContainerManagementProtocol {
|
||||||
|
|
||||||
public ByteBuffer amTokens;
|
public ByteBuffer containerTokens;
|
||||||
|
|
||||||
public MyContainerManager() {
|
public MyContainerManager() {
|
||||||
}
|
}
|
||||||
|
@ -78,23 +93,30 @@ public class TestAMAuthorization {
|
||||||
public StartContainerResponse
|
public StartContainerResponse
|
||||||
startContainer(StartContainerRequest request)
|
startContainer(StartContainerRequest request)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
amTokens = request.getContainerLaunchContext().getTokens();
|
containerTokens = request.getContainerLaunchContext().getTokens();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StopContainerResponse stopContainer(StopContainerRequest request)
|
public StopContainerResponse stopContainer(StopContainerRequest request)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetContainerStatusResponse getContainerStatus(
|
public GetContainerStatusResponse getContainerStatus(
|
||||||
GetContainerStatusRequest request) throws YarnException {
|
GetContainerStatusRequest request) throws YarnException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Credentials getContainerCredentials() throws IOException {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
DataInputByteBuffer buf = new DataInputByteBuffer();
|
||||||
|
containerTokens.rewind();
|
||||||
|
buf.reset(containerTokens);
|
||||||
|
credentials.readTokenStorageStream(buf);
|
||||||
|
return credentials;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
|
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
|
||||||
|
@ -118,7 +140,7 @@ public class TestAMAuthorization {
|
||||||
public void testAuthorizedAccess() throws Exception {
|
public void testAuthorizedAccess() throws Exception {
|
||||||
MyContainerManager containerManager = new MyContainerManager();
|
MyContainerManager containerManager = new MyContainerManager();
|
||||||
final MockRM rm =
|
final MockRM rm =
|
||||||
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
|
new MockRMWithAMS(conf, containerManager);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||||
|
@ -131,11 +153,11 @@ public class TestAMAuthorization {
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
int waitCount = 0;
|
int waitCount = 0;
|
||||||
while (containerManager.amTokens == null && waitCount++ < 20) {
|
while (containerManager.containerTokens == null && waitCount++ < 20) {
|
||||||
LOG.info("Waiting for AM Launch to happen..");
|
LOG.info("Waiting for AM Launch to happen..");
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
Assert.assertNotNull(containerManager.amTokens);
|
Assert.assertNotNull(containerManager.containerTokens);
|
||||||
|
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||||
|
@ -147,11 +169,7 @@ public class TestAMAuthorization {
|
||||||
|
|
||||||
UserGroupInformation currentUser = UserGroupInformation
|
UserGroupInformation currentUser = UserGroupInformation
|
||||||
.createRemoteUser(applicationAttemptId.toString());
|
.createRemoteUser(applicationAttemptId.toString());
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = containerManager.getContainerCredentials();
|
||||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
||||||
containerManager.amTokens.rewind();
|
|
||||||
buf.reset(containerManager.amTokens);
|
|
||||||
credentials.readTokenStorageStream(buf);
|
|
||||||
currentUser.addCredentials(credentials);
|
currentUser.addCredentials(credentials);
|
||||||
|
|
||||||
ApplicationMasterProtocol client = currentUser
|
ApplicationMasterProtocol client = currentUser
|
||||||
|
@ -169,8 +187,10 @@ public class TestAMAuthorization {
|
||||||
RegisterApplicationMasterResponse response =
|
RegisterApplicationMasterResponse response =
|
||||||
client.registerApplicationMaster(request);
|
client.registerApplicationMaster(request);
|
||||||
Assert.assertNotNull(response.getClientToAMTokenMasterKey());
|
Assert.assertNotNull(response.getClientToAMTokenMasterKey());
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
Assert
|
Assert
|
||||||
.assertTrue(response.getClientToAMTokenMasterKey().array().length > 0);
|
.assertTrue(response.getClientToAMTokenMasterKey().array().length > 0);
|
||||||
|
}
|
||||||
Assert.assertEquals("Register response has bad ACLs", "*",
|
Assert.assertEquals("Register response has bad ACLs", "*",
|
||||||
response.getApplicationACLs().get(ApplicationAccessType.VIEW_APP));
|
response.getApplicationACLs().get(ApplicationAccessType.VIEW_APP));
|
||||||
|
|
||||||
|
@ -180,7 +200,7 @@ public class TestAMAuthorization {
|
||||||
@Test
|
@Test
|
||||||
public void testUnauthorizedAccess() throws Exception {
|
public void testUnauthorizedAccess() throws Exception {
|
||||||
MyContainerManager containerManager = new MyContainerManager();
|
MyContainerManager containerManager = new MyContainerManager();
|
||||||
MockRM rm = new MockRMWithAMS(confWithSecurityEnabled, containerManager);
|
MockRM rm = new MockRMWithAMS(conf, containerManager);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
MockNM nm1 = rm.registerNode("localhost:1234", 5120);
|
||||||
|
@ -190,11 +210,11 @@ public class TestAMAuthorization {
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
int waitCount = 0;
|
int waitCount = 0;
|
||||||
while (containerManager.amTokens == null && waitCount++ < 40) {
|
while (containerManager.containerTokens == null && waitCount++ < 40) {
|
||||||
LOG.info("Waiting for AM Launch to happen..");
|
LOG.info("Waiting for AM Launch to happen..");
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
Assert.assertNotNull(containerManager.amTokens);
|
Assert.assertNotNull(containerManager.containerTokens);
|
||||||
|
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||||
|
@ -229,17 +249,19 @@ public class TestAMAuthorization {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Because there are no tokens, the request should be rejected as the
|
// Because there are no tokens, the request should be rejected as the
|
||||||
// server side will assume we are trying simple auth.
|
// server side will assume we are trying simple auth.
|
||||||
|
String availableAuthMethods;
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
availableAuthMethods = "[TOKEN, KERBEROS]";
|
||||||
|
} else {
|
||||||
|
availableAuthMethods = "[TOKEN]";
|
||||||
|
}
|
||||||
Assert.assertTrue(e.getCause().getMessage().contains(
|
Assert.assertTrue(e.getCause().getMessage().contains(
|
||||||
"SIMPLE authentication is not enabled. "
|
"SIMPLE authentication is not enabled. "
|
||||||
+ "Available:[TOKEN, KERBEROS]"));
|
+ "Available:" + availableAuthMethods));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now try to validate invalid authorization.
|
// Now try to validate invalid authorization.
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = containerManager.getContainerCredentials();
|
||||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
||||||
containerManager.amTokens.rewind();
|
|
||||||
buf.reset(containerManager.amTokens);
|
|
||||||
credentials.readTokenStorageStream(buf);
|
|
||||||
currentUser.addCredentials(credentials);
|
currentUser.addCredentials(credentials);
|
||||||
|
|
||||||
// Create a client to the RM.
|
// Create a client to the RM.
|
||||||
|
@ -252,7 +274,8 @@ public class TestAMAuthorization {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
request = Records.newRecord(RegisterApplicationMasterRequest.class);
|
request =
|
||||||
|
Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||||
ApplicationAttemptId otherAppAttemptId = BuilderUtils
|
ApplicationAttemptId otherAppAttemptId = BuilderUtils
|
||||||
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
|
.newApplicationAttemptId(applicationAttemptId.getApplicationId(), 42);
|
||||||
request.setApplicationAttemptId(otherAppAttemptId);
|
request.setApplicationAttemptId(otherAppAttemptId);
|
||||||
|
|
|
@ -18,17 +18,22 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||||
|
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
@ -87,6 +92,22 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private AllocateResponse allocate(final AllocateRequest req) throws Exception {
|
||||||
|
ApplicationAttemptId attemptId = req.getApplicationAttemptId();
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
|
Token<AMRMTokenIdentifier> token =
|
||||||
|
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
|
||||||
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||||
|
@Override
|
||||||
|
public AllocateResponse run() throws Exception {
|
||||||
|
return amService.allocate(req);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAMRMUnusableNodes() throws Exception {
|
public void testAMRMUnusableNodes() throws Exception {
|
||||||
|
|
||||||
|
@ -109,7 +130,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
// allocate request returns no updated node
|
// allocate request returns no updated node
|
||||||
AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1
|
AllocateRequest allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||||
AllocateResponse response1 = amService.allocate(allocateRequest1);
|
AllocateResponse response1 = allocate(allocateRequest1);
|
||||||
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
|
List<NodeReport> updatedNodes = response1.getUpdatedNodes();
|
||||||
Assert.assertEquals(0, updatedNodes.size());
|
Assert.assertEquals(0, updatedNodes.size());
|
||||||
|
|
||||||
|
@ -118,7 +139,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
// allocate request returns updated node
|
// allocate request returns updated node
|
||||||
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
||||||
response1 = amService.allocate(allocateRequest1);
|
response1 = allocate(allocateRequest1);
|
||||||
updatedNodes = response1.getUpdatedNodes();
|
updatedNodes = response1.getUpdatedNodes();
|
||||||
Assert.assertEquals(1, updatedNodes.size());
|
Assert.assertEquals(1, updatedNodes.size());
|
||||||
NodeReport nr = updatedNodes.iterator().next();
|
NodeReport nr = updatedNodes.iterator().next();
|
||||||
|
@ -126,7 +147,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
||||||
|
|
||||||
// resending the allocate request returns the same result
|
// resending the allocate request returns the same result
|
||||||
response1 = amService.allocate(allocateRequest1);
|
response1 = allocate(allocateRequest1);
|
||||||
updatedNodes = response1.getUpdatedNodes();
|
updatedNodes = response1.getUpdatedNodes();
|
||||||
Assert.assertEquals(1, updatedNodes.size());
|
Assert.assertEquals(1, updatedNodes.size());
|
||||||
nr = updatedNodes.iterator().next();
|
nr = updatedNodes.iterator().next();
|
||||||
|
@ -138,7 +159,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
// subsequent allocate request returns delta
|
// subsequent allocate request returns delta
|
||||||
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
||||||
response1 = amService.allocate(allocateRequest1);
|
response1 = allocate(allocateRequest1);
|
||||||
updatedNodes = response1.getUpdatedNodes();
|
updatedNodes = response1.getUpdatedNodes();
|
||||||
Assert.assertEquals(1, updatedNodes.size());
|
Assert.assertEquals(1, updatedNodes.size());
|
||||||
nr = updatedNodes.iterator().next();
|
nr = updatedNodes.iterator().next();
|
||||||
|
@ -158,7 +179,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
// allocate request returns no updated node
|
// allocate request returns no updated node
|
||||||
AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2
|
AllocateRequest allocateRequest2 = AllocateRequest.newInstance(attempt2
|
||||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||||
AllocateResponse response2 = amService.allocate(allocateRequest2);
|
AllocateResponse response2 = allocate(allocateRequest2);
|
||||||
updatedNodes = response2.getUpdatedNodes();
|
updatedNodes = response2.getUpdatedNodes();
|
||||||
Assert.assertEquals(0, updatedNodes.size());
|
Assert.assertEquals(0, updatedNodes.size());
|
||||||
|
|
||||||
|
@ -167,7 +188,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
// both AM's should get delta updated nodes
|
// both AM's should get delta updated nodes
|
||||||
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
allocateRequest1 = AllocateRequest.newInstance(attempt1
|
||||||
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
.getAppAttemptId(), response1.getResponseId(), 0F, null, null, null);
|
||||||
response1 = amService.allocate(allocateRequest1);
|
response1 = allocate(allocateRequest1);
|
||||||
updatedNodes = response1.getUpdatedNodes();
|
updatedNodes = response1.getUpdatedNodes();
|
||||||
Assert.assertEquals(1, updatedNodes.size());
|
Assert.assertEquals(1, updatedNodes.size());
|
||||||
nr = updatedNodes.iterator().next();
|
nr = updatedNodes.iterator().next();
|
||||||
|
@ -176,7 +197,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
|
|
||||||
allocateRequest2 = AllocateRequest.newInstance(attempt2
|
allocateRequest2 = AllocateRequest.newInstance(attempt2
|
||||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
|
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
|
||||||
response2 = amService.allocate(allocateRequest2);
|
response2 = allocate(allocateRequest2);
|
||||||
updatedNodes = response2.getUpdatedNodes();
|
updatedNodes = response2.getUpdatedNodes();
|
||||||
Assert.assertEquals(1, updatedNodes.size());
|
Assert.assertEquals(1, updatedNodes.size());
|
||||||
nr = updatedNodes.iterator().next();
|
nr = updatedNodes.iterator().next();
|
||||||
|
@ -186,7 +207,7 @@ public class TestAMRMRPCNodeUpdates {
|
||||||
// subsequent allocate calls should return no updated nodes
|
// subsequent allocate calls should return no updated nodes
|
||||||
allocateRequest2 = AllocateRequest.newInstance(attempt2
|
allocateRequest2 = AllocateRequest.newInstance(attempt2
|
||||||
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
|
.getAppAttemptId(), response2.getResponseId(), 0F, null, null, null);
|
||||||
response2 = amService.allocate(allocateRequest2);
|
response2 = allocate(allocateRequest2);
|
||||||
updatedNodes = response2.getUpdatedNodes();
|
updatedNodes = response2.getUpdatedNodes();
|
||||||
Assert.assertEquals(0, updatedNodes.size());
|
Assert.assertEquals(0, updatedNodes.size());
|
||||||
|
|
||||||
|
|
|
@ -18,19 +18,20 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||||
|
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -39,20 +40,13 @@ import org.junit.Test;
|
||||||
|
|
||||||
public class TestAMRMRPCResponseId {
|
public class TestAMRMRPCResponseId {
|
||||||
|
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
|
||||||
.getRecordFactory(null);
|
|
||||||
|
|
||||||
private MockRM rm;
|
private MockRM rm;
|
||||||
ApplicationMasterService amService = null;
|
ApplicationMasterService amService = null;
|
||||||
private ClientRMService clientService;
|
|
||||||
|
|
||||||
private RMContext context;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
this.rm = new MockRM();
|
this.rm = new MockRM();
|
||||||
rm.start();
|
rm.start();
|
||||||
this.clientService = rm.getClientRMService();
|
|
||||||
amService = rm.getApplicationMasterService();
|
amService = rm.getApplicationMasterService();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,6 +57,22 @@ public class TestAMRMRPCResponseId {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private AllocateResponse allocate(final AllocateRequest req) throws Exception {
|
||||||
|
ApplicationAttemptId attemptId = req.getApplicationAttemptId();
|
||||||
|
UserGroupInformation ugi =
|
||||||
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
||||||
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
|
||||||
|
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
|
||||||
|
.getRMAppAttempt(attemptId).getAMRMToken();
|
||||||
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
||||||
|
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||||
|
@Override
|
||||||
|
public AllocateResponse run() throws Exception {
|
||||||
|
return amService.allocate(req);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testARRMResponseId() throws Exception {
|
public void testARRMResponseId() throws Exception {
|
||||||
|
|
||||||
|
@ -81,22 +91,22 @@ public class TestAMRMRPCResponseId {
|
||||||
AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt
|
AllocateRequest allocateRequest = AllocateRequest.newInstance(attempt
|
||||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||||
|
|
||||||
AllocateResponse response = amService.allocate(allocateRequest);
|
AllocateResponse response = allocate(allocateRequest);
|
||||||
Assert.assertEquals(1, response.getResponseId());
|
Assert.assertEquals(1, response.getResponseId());
|
||||||
Assert.assertTrue(response.getAMCommand() == null);
|
Assert.assertTrue(response.getAMCommand() == null);
|
||||||
allocateRequest = AllocateRequest.newInstance(attempt
|
allocateRequest = AllocateRequest.newInstance(attempt
|
||||||
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
|
.getAppAttemptId(), response.getResponseId(), 0F, null, null, null);
|
||||||
|
|
||||||
response = amService.allocate(allocateRequest);
|
response = allocate(allocateRequest);
|
||||||
Assert.assertEquals(2, response.getResponseId());
|
Assert.assertEquals(2, response.getResponseId());
|
||||||
/* try resending */
|
/* try resending */
|
||||||
response = amService.allocate(allocateRequest);
|
response = allocate(allocateRequest);
|
||||||
Assert.assertEquals(2, response.getResponseId());
|
Assert.assertEquals(2, response.getResponseId());
|
||||||
|
|
||||||
/** try sending old request again **/
|
/** try sending old request again **/
|
||||||
allocateRequest = AllocateRequest.newInstance(attempt
|
allocateRequest = AllocateRequest.newInstance(attempt
|
||||||
.getAppAttemptId(), 0, 0F, null, null, null);
|
.getAppAttemptId(), 0, 0F, null, null, null);
|
||||||
response = amService.allocate(allocateRequest);
|
response = allocate(allocateRequest);
|
||||||
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
|
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
@ -296,7 +297,8 @@ public class TestSchedulerUtils {
|
||||||
|
|
||||||
UserGroupInformation currentUser =
|
UserGroupInformation currentUser =
|
||||||
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
|
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
|
||||||
|
Credentials credentials = containerManager.getContainerCredentials();
|
||||||
|
currentUser.addCredentials(credentials);
|
||||||
ApplicationMasterProtocol client = currentUser
|
ApplicationMasterProtocol client = currentUser
|
||||||
.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||||
|
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
@ -26,7 +28,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
@ -46,17 +47,29 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestAMRMTokens {
|
public class TestAMRMTokens {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestAMRMTokens.class);
|
private static final Log LOG = LogFactory.getLog(TestAMRMTokens.class);
|
||||||
|
|
||||||
private static final Configuration confWithSecurityEnabled =
|
private final Configuration conf;
|
||||||
new Configuration();
|
|
||||||
static {
|
@Parameters
|
||||||
confWithSecurityEnabled.set(
|
public static Collection<Object[]> configs() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
Configuration confWithSecurity = new Configuration();
|
||||||
|
confWithSecurity.set(
|
||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
|
||||||
UserGroupInformation.setConfiguration(confWithSecurityEnabled);
|
return Arrays.asList(new Object[][] {{ conf }, { confWithSecurity } });
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestAMRMTokens(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -70,7 +83,7 @@ public class TestAMRMTokens {
|
||||||
|
|
||||||
MyContainerManager containerManager = new MyContainerManager();
|
MyContainerManager containerManager = new MyContainerManager();
|
||||||
final MockRM rm =
|
final MockRM rm =
|
||||||
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
|
new MockRMWithAMS(conf, containerManager);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
final Configuration conf = rm.getConfig();
|
final Configuration conf = rm.getConfig();
|
||||||
|
@ -85,11 +98,11 @@ public class TestAMRMTokens {
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
int waitCount = 0;
|
int waitCount = 0;
|
||||||
while (containerManager.amTokens == null && waitCount++ < 20) {
|
while (containerManager.containerTokens == null && waitCount++ < 20) {
|
||||||
LOG.info("Waiting for AM Launch to happen..");
|
LOG.info("Waiting for AM Launch to happen..");
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
Assert.assertNotNull(containerManager.amTokens);
|
Assert.assertNotNull(containerManager.containerTokens);
|
||||||
|
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||||
|
@ -98,11 +111,7 @@ public class TestAMRMTokens {
|
||||||
UserGroupInformation currentUser =
|
UserGroupInformation currentUser =
|
||||||
UserGroupInformation
|
UserGroupInformation
|
||||||
.createRemoteUser(applicationAttemptId.toString());
|
.createRemoteUser(applicationAttemptId.toString());
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = containerManager.getContainerCredentials();
|
||||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
||||||
containerManager.amTokens.rewind();
|
|
||||||
buf.reset(containerManager.amTokens);
|
|
||||||
credentials.readTokenStorageStream(buf);
|
|
||||||
currentUser.addCredentials(credentials);
|
currentUser.addCredentials(credentials);
|
||||||
|
|
||||||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||||
|
@ -162,7 +171,7 @@ public class TestAMRMTokens {
|
||||||
|
|
||||||
MyContainerManager containerManager = new MyContainerManager();
|
MyContainerManager containerManager = new MyContainerManager();
|
||||||
final MockRM rm =
|
final MockRM rm =
|
||||||
new MockRMWithAMS(confWithSecurityEnabled, containerManager);
|
new MockRMWithAMS(conf, containerManager);
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
final Configuration conf = rm.getConfig();
|
final Configuration conf = rm.getConfig();
|
||||||
|
@ -177,11 +186,11 @@ public class TestAMRMTokens {
|
||||||
nm1.nodeHeartbeat(true);
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
int waitCount = 0;
|
int waitCount = 0;
|
||||||
while (containerManager.amTokens == null && waitCount++ < 20) {
|
while (containerManager.containerTokens == null && waitCount++ < 20) {
|
||||||
LOG.info("Waiting for AM Launch to happen..");
|
LOG.info("Waiting for AM Launch to happen..");
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
Assert.assertNotNull(containerManager.amTokens);
|
Assert.assertNotNull(containerManager.containerTokens);
|
||||||
|
|
||||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
|
||||||
|
@ -190,11 +199,7 @@ public class TestAMRMTokens {
|
||||||
UserGroupInformation currentUser =
|
UserGroupInformation currentUser =
|
||||||
UserGroupInformation
|
UserGroupInformation
|
||||||
.createRemoteUser(applicationAttemptId.toString());
|
.createRemoteUser(applicationAttemptId.toString());
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = containerManager.getContainerCredentials();
|
||||||
DataInputByteBuffer buf = new DataInputByteBuffer();
|
|
||||||
containerManager.amTokens.rewind();
|
|
||||||
buf.reset(containerManager.amTokens);
|
|
||||||
credentials.readTokenStorageStream(buf);
|
|
||||||
currentUser.addCredentials(credentials);
|
currentUser.addCredentials(credentials);
|
||||||
|
|
||||||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||||
|
|
Loading…
Reference in New Issue