YARN-945. Removed setting of AMRMToken's service from ResourceManager and changed client libraries do it all the time and correctly. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1508232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8bb035509e
commit
817a654346
|
@ -771,6 +771,9 @@ Release 2.1.0-beta - 2013-07-02
|
|||
YARN-961. Changed ContainerManager to enforce Token auth irrespective of
|
||||
security. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
YARN-945. Removed setting of AMRMToken's service from ResourceManager
|
||||
and changed client libraries do it all the time and correctly. (vinodkv)
|
||||
|
||||
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
YARN-158. Yarn creating package-info.java must not depend on sh.
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.security.Credentials;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
|
@ -183,6 +182,9 @@ public class UnmanagedAMLauncher {
|
|||
Credentials credentials = new Credentials();
|
||||
Token<AMRMTokenIdentifier> token =
|
||||
rmClient.getAMRMToken(attemptId.getApplicationId());
|
||||
// Service will be empty but that's okay, we are just passing down only
|
||||
// AMRMToken down to the real AM which eventually sets the correct
|
||||
// service-address.
|
||||
credentials.addToken(token.getService(), token);
|
||||
File tokenFile = File.createTempFile("unmanagedAMRMToken","",
|
||||
new File(System.getProperty("user.dir")));
|
||||
|
|
|
@ -24,12 +24,17 @@ import java.net.InetSocketAddress;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
|
||||
public class ClientRMProxy<T> extends RMProxy<T>{
|
||||
public class ClientRMProxy<T> extends RMProxy<T> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
|
||||
|
||||
|
@ -39,7 +44,24 @@ public class ClientRMProxy<T> extends RMProxy<T>{
|
|||
return createRMProxy(conf, protocol, rmAddress);
|
||||
}
|
||||
|
||||
private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
|
||||
private static void setupTokens(InetSocketAddress resourceManagerAddress)
|
||||
throws IOException {
|
||||
// It is assumed for now that the only AMRMToken in AM's UGI is for this
|
||||
// cluster/RM. TODO: Fix later when we have some kind of cluster-ID as
|
||||
// default service-address, see YARN-986.
|
||||
for (Token<? extends TokenIdentifier> token : UserGroupInformation
|
||||
.getCurrentUser().getTokens()) {
|
||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||
// This token needs to be directly provided to the AMs, so set the
|
||||
// appropriate service-name. We'll need more infrastructure when we
|
||||
// need to set it in HA case.
|
||||
SecurityUtil.setTokenService(token, resourceManagerAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static InetSocketAddress getRMAddress(Configuration conf,
|
||||
Class<?> protocol) throws IOException {
|
||||
if (protocol == ApplicationClientProtocol.class) {
|
||||
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS,
|
||||
|
@ -50,10 +72,12 @@ public class ClientRMProxy<T> extends RMProxy<T>{
|
|||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
|
||||
} else if (protocol == ApplicationMasterProtocol.class) {
|
||||
return conf.getSocketAddr(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
InetSocketAddress serviceAddr =
|
||||
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
setupTokens(serviceAddr);
|
||||
return serviceAddr;
|
||||
} else {
|
||||
String message = "Unsupported protocol found when creating the proxy " +
|
||||
"connection to ResourceManager: " +
|
||||
|
|
|
@ -165,7 +165,7 @@ public abstract class YarnClient extends AbstractService {
|
|||
* @throws IOException
|
||||
*/
|
||||
public abstract org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
|
||||
getAMRMToken(ApplicationId appId) throws YarnException, IOException;
|
||||
getAMRMToken(ApplicationId appId) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
|
|
@ -33,10 +33,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
|
@ -199,15 +199,11 @@ public class YarnClientImpl extends YarnClient {
|
|||
|
||||
public org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
|
||||
getAMRMToken(ApplicationId appId) throws YarnException, IOException {
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = null;
|
||||
ApplicationReport report = getApplicationReport(appId);
|
||||
Token token = report.getAMRMToken();
|
||||
Token token = getApplicationReport(appId).getAMRMToken();
|
||||
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
|
||||
null;
|
||||
if (token != null) {
|
||||
InetSocketAddress address = getConfig().getSocketAddr(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
amrmToken = ConverterUtils.convertFromYarn(token, address);
|
||||
amrmToken = ConverterUtils.convertFromYarn(token, null);
|
||||
}
|
||||
return amrmToken;
|
||||
}
|
||||
|
|
|
@ -29,23 +29,30 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@SuppressWarnings("unchecked")
|
||||
public class RMProxy<T> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RMProxy.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T createRMProxy(final Configuration conf,
|
||||
final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
|
||||
RetryPolicy retryPolicy = createRetryPolicy(conf);
|
||||
|
@ -54,12 +61,11 @@ public class RMProxy<T> {
|
|||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected static <T> T getProxy(final Configuration conf,
|
||||
private static <T> T getProxy(final Configuration conf,
|
||||
final Class<T> protocol, final InetSocketAddress rmAddress)
|
||||
throws IOException {
|
||||
return (T) UserGroupInformation.getCurrentUser().doAs(
|
||||
new PrivilegedAction<Object>() {
|
||||
return UserGroupInformation.getCurrentUser().doAs(
|
||||
new PrivilegedAction<T>() {
|
||||
|
||||
@Override
|
||||
public T run() {
|
||||
|
@ -68,6 +74,8 @@ public class RMProxy<T> {
|
|||
});
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static RetryPolicy createRetryPolicy(Configuration conf) {
|
||||
long rmConnectWaitMS =
|
||||
conf.getInt(
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.yarn.client.RMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
public class ServerRMProxy<T> extends RMProxy<T>{
|
||||
public class ServerRMProxy<T> extends RMProxy<T> {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
|
||||
|
||||
|
@ -43,8 +43,7 @@ public class ServerRMProxy<T> extends RMProxy<T>{
|
|||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
String message = "Unsupported protocol found when creating the proxy " +
|
||||
"connection to ResourceManager: " +
|
||||
((protocol != null) ? protocol.getClass().getName() : "null");
|
||||
|
@ -52,4 +51,4 @@ public class ServerRMProxy<T> extends RMProxy<T>{
|
|||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -61,6 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The launch of the AM itself.
|
||||
*/
|
||||
|
@ -224,7 +226,7 @@ public class AMLauncher implements Runnable {
|
|||
}
|
||||
|
||||
// Add AMRMToken
|
||||
Token<AMRMTokenIdentifier> amrmToken = application.getAMRMToken();
|
||||
Token<AMRMTokenIdentifier> amrmToken = getAMRMToken();
|
||||
if (amrmToken != null) {
|
||||
credentials.addToken(amrmToken.getService(), amrmToken);
|
||||
}
|
||||
|
@ -232,6 +234,11 @@ public class AMLauncher implements Runnable {
|
|||
credentials.writeTokenStorageToStream(dob);
|
||||
container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
return application.getAMRMToken();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void run() {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
|
@ -43,9 +44,9 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
|
@ -376,11 +377,16 @@ public abstract class RMStateStore {
|
|||
protected abstract void removeApplicationState(ApplicationState appState)
|
||||
throws Exception;
|
||||
|
||||
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
|
||||
// YARN-986
|
||||
public static final Text AM_RM_TOKEN_SERVICE = new Text(
|
||||
"AM_RM_TOKEN_SERVICE");
|
||||
|
||||
private Credentials getTokensFromAppAttempt(RMAppAttempt appAttempt) {
|
||||
Credentials credentials = new Credentials();
|
||||
Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken();
|
||||
if(appToken != null){
|
||||
credentials.addToken(appToken.getService(), appToken);
|
||||
credentials.addToken(AM_RM_TOKEN_SERVICE, appToken);
|
||||
}
|
||||
Token<ClientToAMTokenIdentifier> clientToAMToken =
|
||||
appAttempt.getClientToAMToken();
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
|||
|
||||
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -41,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
|
@ -62,7 +60,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenSelector;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
|
@ -684,15 +681,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
appAttemptTokens.getAllTokens());
|
||||
}
|
||||
|
||||
InetSocketAddress serviceAddr =
|
||||
conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
AMRMTokenSelector appTokenSelector = new AMRMTokenSelector();
|
||||
// Only one AMRMToken is stored per-attempt, so this should be fine. Can't
|
||||
// use TokenSelector as service may change - think fail-over.
|
||||
this.amrmToken =
|
||||
appTokenSelector.selectToken(
|
||||
SecurityUtil.buildTokenService(serviceAddr),
|
||||
appAttemptTokens.getAllTokens());
|
||||
(Token<AMRMTokenIdentifier>) appAttemptTokens
|
||||
.getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
|
||||
|
||||
// For now, no need to populate tokens back to AMRMTokenSecretManager,
|
||||
// because running attempts are rebooted. Later in work-preserve restart,
|
||||
|
@ -736,18 +729,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
// create AMRMToken
|
||||
AMRMTokenIdentifier id =
|
||||
new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
|
||||
Token<AMRMTokenIdentifier> amRmToken =
|
||||
appAttempt.amrmToken =
|
||||
new Token<AMRMTokenIdentifier>(id,
|
||||
appAttempt.rmContext.getAMRMTokenSecretManager());
|
||||
InetSocketAddress serviceAddr =
|
||||
appAttempt.conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
// normally the client should set the service after acquiring the
|
||||
// token, but this token is directly provided to the AMs
|
||||
SecurityUtil.setTokenService(amRmToken, serviceAddr);
|
||||
|
||||
appAttempt.amrmToken = amRmToken;
|
||||
|
||||
// Add the application to the scheduler
|
||||
appAttempt.eventHandler.handle(
|
||||
|
|
|
@ -18,9 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||
|
@ -52,6 +58,17 @@ public class MockRMWithCustomAMLauncher extends MockRM {
|
|||
ContainerId containerId) {
|
||||
return containerManager;
|
||||
}
|
||||
@Override
|
||||
protected Token<AMRMTokenIdentifier> getAMRMToken() {
|
||||
Token<AMRMTokenIdentifier> amRmToken = super.getAMRMToken();
|
||||
InetSocketAddress serviceAddr =
|
||||
getConfig().getSocketAddr(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
SecurityUtil.setTokenService(amRmToken, serviceAddr);
|
||||
return amRmToken;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
|
|
@ -33,7 +33,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
|
@ -49,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
@ -142,6 +146,19 @@ public class TestAMAuthorization {
|
|||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
return new ApplicationMasterService(getRMContext(), this.scheduler);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Token<? extends TokenIdentifier> setupAndReturnAMRMToken(
|
||||
InetSocketAddress rmBindAddress,
|
||||
Collection<Token<? extends TokenIdentifier>> allTokens) {
|
||||
for (Token<? extends TokenIdentifier> token : allTokens) {
|
||||
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
||||
SecurityUtil.setTokenService(token, rmBindAddress);
|
||||
return (Token<AMRMTokenIdentifier>) token;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -178,8 +195,12 @@ public class TestAMAuthorization {
|
|||
UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
Credentials credentials = containerManager.getContainerCredentials();
|
||||
currentUser.addCredentials(credentials);
|
||||
|
||||
final InetSocketAddress rmBindAddress =
|
||||
rm.getApplicationMasterService().getBindAddress();
|
||||
Token<? extends TokenIdentifier> amRMToken =
|
||||
MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
|
||||
credentials.getAllTokens());
|
||||
currentUser.addToken(amRMToken);
|
||||
ApplicationMasterProtocol client = currentUser
|
||||
.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -32,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
|
@ -47,7 +50,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestExceptio
|
|||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
|
@ -274,7 +276,7 @@ public class TestSchedulerUtils {
|
|||
public void testValidateResourceBlacklistRequest() throws Exception {
|
||||
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRM rm =
|
||||
final MockRMWithAMS rm =
|
||||
new MockRMWithAMS(new YarnConfiguration(), containerManager);
|
||||
rm.start();
|
||||
|
||||
|
@ -298,13 +300,18 @@ public class TestSchedulerUtils {
|
|||
UserGroupInformation currentUser =
|
||||
UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
|
||||
Credentials credentials = containerManager.getContainerCredentials();
|
||||
currentUser.addCredentials(credentials);
|
||||
ApplicationMasterProtocol client = currentUser
|
||||
.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
final InetSocketAddress rmBindAddress =
|
||||
rm.getApplicationMasterService().getBindAddress();
|
||||
Token<? extends TokenIdentifier> amRMToken =
|
||||
MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
|
||||
credentials.getAllTokens());
|
||||
currentUser.addToken(amRMToken);
|
||||
ApplicationMasterProtocol client =
|
||||
currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
||||
@Override
|
||||
public ApplicationMasterProtocol run() {
|
||||
return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rm
|
||||
.getApplicationMasterService().getBindAddress(), conf);
|
||||
return (ApplicationMasterProtocol) rpc.getProxy(
|
||||
ApplicationMasterProtocol.class, rmBindAddress, conf);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.security;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -30,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
|
@ -81,7 +84,7 @@ public class TestAMRMTokens {
|
|||
public void testTokenExpiry() throws Exception {
|
||||
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRM rm =
|
||||
final MockRMWithAMS rm =
|
||||
new MockRMWithAMS(conf, containerManager);
|
||||
rm.start();
|
||||
|
||||
|
@ -111,8 +114,12 @@ public class TestAMRMTokens {
|
|||
UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
Credentials credentials = containerManager.getContainerCredentials();
|
||||
currentUser.addCredentials(credentials);
|
||||
|
||||
final InetSocketAddress rmBindAddress =
|
||||
rm.getApplicationMasterService().getBindAddress();
|
||||
Token<? extends TokenIdentifier> amRMToken =
|
||||
MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
|
||||
credentials.getAllTokens());
|
||||
currentUser.addToken(amRMToken);
|
||||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
|
||||
RegisterApplicationMasterRequest request =
|
||||
|
@ -164,7 +171,7 @@ public class TestAMRMTokens {
|
|||
public void testMasterKeyRollOver() throws Exception {
|
||||
|
||||
MyContainerManager containerManager = new MyContainerManager();
|
||||
final MockRM rm =
|
||||
final MockRMWithAMS rm =
|
||||
new MockRMWithAMS(conf, containerManager);
|
||||
rm.start();
|
||||
|
||||
|
@ -194,8 +201,12 @@ public class TestAMRMTokens {
|
|||
UserGroupInformation
|
||||
.createRemoteUser(applicationAttemptId.toString());
|
||||
Credentials credentials = containerManager.getContainerCredentials();
|
||||
currentUser.addCredentials(credentials);
|
||||
|
||||
final InetSocketAddress rmBindAddress =
|
||||
rm.getApplicationMasterService().getBindAddress();
|
||||
Token<? extends TokenIdentifier> amRMToken =
|
||||
MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
|
||||
credentials.getAllTokens());
|
||||
currentUser.addToken(amRMToken);
|
||||
rmClient = createRMClient(rm, conf, rpc, currentUser);
|
||||
|
||||
RegisterApplicationMasterRequest request =
|
||||
|
|
Loading…
Reference in New Issue