YARN-1107. Fixed a bug in ResourceManager because of which RM in secure mode fails to restart. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1520726 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1520727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7cb29ba170
commit
bbbee0e556
|
@ -141,6 +141,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
need more than a node's total capability were incorrectly allocated on that
|
need more than a node's total capability were incorrectly allocated on that
|
||||||
node causing apps to hang. (Omkar Vinit Joshi via vinodkv)
|
node causing apps to hang. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
|
YARN-1107. Fixed a bug in ResourceManager because of which RM in secure mode
|
||||||
|
fails to restart. (Omkar Vinit Joshi via vinodkv)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-22
|
Release 2.1.0-beta - 2013-08-22
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -43,10 +43,10 @@ import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||||
|
@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
|
@ -88,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
@ -160,9 +158,6 @@ public class ClientRMService extends AbstractService implements
|
||||||
this.server.start();
|
this.server.start();
|
||||||
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
|
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
|
||||||
server.getListenerAddress());
|
server.getListenerAddress());
|
||||||
// enable RM to short-circuit token operations directly to itself
|
|
||||||
RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
|
||||||
rmDTSecretManager, clientBindAddress);
|
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Context of the ResourceManager.
|
* Context of the ResourceManager.
|
||||||
|
@ -64,4 +65,13 @@ public interface RMContext {
|
||||||
NMTokenSecretManagerInRM getNMTokenSecretManager();
|
NMTokenSecretManagerInRM getNMTokenSecretManager();
|
||||||
|
|
||||||
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
|
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
|
||||||
|
|
||||||
|
void setClientRMService(ClientRMService clientRMService);
|
||||||
|
|
||||||
|
ClientRMService getClientRMService();
|
||||||
|
|
||||||
|
RMDelegationTokenSecretManager getRMDelegationTokenSecretManager();
|
||||||
|
|
||||||
|
void setRMDelegationTokenSecretManager(
|
||||||
|
RMDelegationTokenSecretManager delegationTokenSecretManager);
|
||||||
}
|
}
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -61,6 +62,8 @@ public class RMContextImpl implements RMContext {
|
||||||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||||
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
|
||||||
|
private ClientRMService clientRMService;
|
||||||
|
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
|
||||||
|
|
||||||
public RMContextImpl(Dispatcher rmDispatcher,
|
public RMContextImpl(Dispatcher rmDispatcher,
|
||||||
RMStateStore store,
|
RMStateStore store,
|
||||||
|
@ -178,4 +181,25 @@ public class RMContextImpl implements RMContext {
|
||||||
public void setStateStore(RMStateStore store) {
|
public void setStateStore(RMStateStore store) {
|
||||||
stateStore = store;
|
stateStore = store;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientRMService getClientRMService() {
|
||||||
|
return this.clientRMService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setClientRMService(ClientRMService clientRMService) {
|
||||||
|
this.clientRMService = clientRMService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
|
||||||
|
return this.rmDelegationTokenSecretManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRMDelegationTokenSecretManager(
|
||||||
|
RMDelegationTokenSecretManager delegationTokenSecretManager) {
|
||||||
|
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -169,11 +169,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
|
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
|
||||||
addService(amFinishingMonitor);
|
addService(amFinishingMonitor);
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
this.delegationTokenRenewer = createDelegationTokenRenewer();
|
|
||||||
addService(delegationTokenRenewer);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
||||||
this.nmTokenSecretManager = createNMTokenSecretManager(conf);
|
this.nmTokenSecretManager = createNMTokenSecretManager(conf);
|
||||||
|
|
||||||
|
@ -200,6 +195,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
ExitUtil.terminate(1, e);
|
ExitUtil.terminate(1, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
this.delegationTokenRenewer = createDelegationTokenRenewer();
|
||||||
|
}
|
||||||
|
|
||||||
this.rmContext =
|
this.rmContext =
|
||||||
new RMContextImpl(this.rmDispatcher, rmStore,
|
new RMContextImpl(this.rmDispatcher, rmStore,
|
||||||
this.containerAllocationExpirer, amLivelinessMonitor,
|
this.containerAllocationExpirer, amLivelinessMonitor,
|
||||||
|
@ -260,7 +259,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
this.rmDispatcher.register(RMAppManagerEventType.class,
|
this.rmDispatcher.register(RMAppManagerEventType.class,
|
||||||
this.rmAppManager);
|
this.rmAppManager);
|
||||||
this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
|
this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
|
||||||
|
rmContext.setRMDelegationTokenSecretManager(this.rmDTSecretManager);
|
||||||
clientRM = createClientRMService();
|
clientRM = createClientRMService();
|
||||||
|
rmContext.setClientRMService(clientRM);
|
||||||
addService(clientRM);
|
addService(clientRM);
|
||||||
|
|
||||||
adminService = createAdminService(clientRM, masterService, resourceTracker);
|
adminService = createAdminService(clientRM, masterService, resourceTracker);
|
||||||
|
@ -271,7 +272,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
this.applicationMasterLauncher);
|
this.applicationMasterLauncher);
|
||||||
|
|
||||||
addService(applicationMasterLauncher);
|
addService(applicationMasterLauncher);
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
addService(delegationTokenRenewer);
|
||||||
|
delegationTokenRenewer.setRMContext(rmContext);
|
||||||
|
}
|
||||||
new RMNMInfo(this.rmContext, this.scheduler);
|
new RMNMInfo(this.rmContext, this.scheduler);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
|
@ -620,13 +624,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
this.containerTokenSecretManager.start();
|
this.containerTokenSecretManager.start();
|
||||||
this.nmTokenSecretManager.start();
|
this.nmTokenSecretManager.start();
|
||||||
|
|
||||||
// Explicitly start DTRenewer too in secure mode before kicking recovery as
|
|
||||||
// tokens will start getting added for renewal as part of the recovery
|
|
||||||
// process itself.
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
this.delegationTokenRenewer.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
RMStateStore rmStore = rmContext.getStateStore();
|
RMStateStore rmStore = rmContext.getStateStore();
|
||||||
// The state store needs to start irrespective of recoveryEnabled as apps
|
// The state store needs to start irrespective of recoveryEnabled as apps
|
||||||
// need events to move to further states.
|
// need events to move to further states.
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.TimerTask;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -47,6 +48,8 @@ import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -64,6 +67,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
|
|
||||||
// global single timer (daemon)
|
// global single timer (daemon)
|
||||||
private Timer renewalTimer;
|
private Timer renewalTimer;
|
||||||
|
private RMContext rmContext;
|
||||||
|
|
||||||
// delegation token canceler thread
|
// delegation token canceler thread
|
||||||
private DelegationTokenCancelThread dtCancelThread =
|
private DelegationTokenCancelThread dtCancelThread =
|
||||||
|
@ -80,6 +84,9 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
private long tokenRemovalDelayMs;
|
private long tokenRemovalDelayMs;
|
||||||
|
|
||||||
private Thread delayedRemovalThread;
|
private Thread delayedRemovalThread;
|
||||||
|
private boolean isServiceStarted = false;
|
||||||
|
private List<DelegationTokenToRenew> pendingTokenForRenewal =
|
||||||
|
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
||||||
|
|
||||||
private boolean tokenKeepAliveEnabled;
|
private boolean tokenKeepAliveEnabled;
|
||||||
|
|
||||||
|
@ -100,7 +107,6 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
|
|
||||||
dtCancelThread.start();
|
dtCancelThread.start();
|
||||||
renewalTimer = new Timer(true);
|
renewalTimer = new Timer(true);
|
||||||
if (tokenKeepAliveEnabled) {
|
if (tokenKeepAliveEnabled) {
|
||||||
|
@ -109,6 +115,15 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
"DelayedTokenCanceller");
|
"DelayedTokenCanceller");
|
||||||
delayedRemovalThread.start();
|
delayedRemovalThread.start();
|
||||||
}
|
}
|
||||||
|
// enable RM to short-circuit token operations directly to itself
|
||||||
|
RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
||||||
|
rmContext.getRMDelegationTokenSecretManager(),
|
||||||
|
rmContext.getClientRMService().getBindAddress());
|
||||||
|
// Delegation token renewal is delayed until ClientRMService starts. As
|
||||||
|
// it is required to short circuit the token renewal calls.
|
||||||
|
isServiceStarted = true;
|
||||||
|
renewIfServiceIsStarted(pendingTokenForRenewal);
|
||||||
|
pendingTokenForRenewal.clear();
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,8 +290,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void addApplication(
|
public void addApplication(
|
||||||
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
|
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (ts == null) {
|
if (ts == null) {
|
||||||
return; //nothing to add
|
return; //nothing to add
|
||||||
}
|
}
|
||||||
|
@ -291,25 +306,40 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
|
|
||||||
// find tokens for renewal, but don't add timers until we know
|
// find tokens for renewal, but don't add timers until we know
|
||||||
// all renewable tokens are valid
|
// all renewable tokens are valid
|
||||||
Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
|
// At RM restart it is safe to assume that all the previously added tokens
|
||||||
|
// are valid
|
||||||
|
List<DelegationTokenToRenew> tokenList =
|
||||||
|
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
|
||||||
for(Token<?> token : tokens) {
|
for(Token<?> token : tokens) {
|
||||||
// first renew happens immediately
|
|
||||||
if (token.isManaged()) {
|
if (token.isManaged()) {
|
||||||
DelegationTokenToRenew dtr =
|
tokenList.add(new DelegationTokenToRenew(applicationId,
|
||||||
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
|
token, getConfig(), now, shouldCancelAtEnd));
|
||||||
shouldCancelAtEnd);
|
|
||||||
renewToken(dtr);
|
|
||||||
dtrs.add(dtr);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (DelegationTokenToRenew dtr : dtrs) {
|
if (!tokenList.isEmpty()){
|
||||||
addTokenToList(dtr);
|
renewIfServiceIsStarted(tokenList);
|
||||||
setTimerForTokenRenewal(dtr);
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
}
|
||||||
LOG.debug("Registering token for renewal for:" +
|
|
||||||
" service = " + dtr.token.getService() +
|
protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
|
||||||
" for appId = " + applicationId);
|
throws IOException {
|
||||||
|
if (isServiceStarted) {
|
||||||
|
// Renewing token and adding it to timer calls are separated purposefully
|
||||||
|
// If user provides incorrect token then it should not be added for
|
||||||
|
// renewal.
|
||||||
|
for (DelegationTokenToRenew dtr : dtrs) {
|
||||||
|
renewToken(dtr);
|
||||||
}
|
}
|
||||||
|
for (DelegationTokenToRenew dtr : dtrs) {
|
||||||
|
addTokenToList(dtr);
|
||||||
|
setTimerForTokenRenewal(dtr);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Registering token for renewal for:" + " service = "
|
||||||
|
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pendingTokenForRenewal.addAll(dtrs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -513,4 +543,7 @@ public class DelegationTokenRenewer extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRMContext(RMContext rmContext) {
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -35,6 +34,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -90,7 +90,7 @@ public class TestRMRestart {
|
||||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
|
||||||
rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123);
|
rmAddr = new InetSocketAddress("localhost", 8032);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=180000)
|
@Test (timeout=180000)
|
||||||
|
@ -592,7 +592,12 @@ public class TestRMRestart {
|
||||||
@Test
|
@Test
|
||||||
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
|
||||||
|
conf.set(
|
||||||
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
RMState rmState = memStore.getState();
|
RMState rmState = memStore.getState();
|
||||||
|
@ -614,6 +619,8 @@ public class TestRMRestart {
|
||||||
// request a token and add into credential
|
// request a token and add into credential
|
||||||
GetDelegationTokenRequest request1 =
|
GetDelegationTokenRequest request1 =
|
||||||
GetDelegationTokenRequest.newInstance("renewer1");
|
GetDelegationTokenRequest.newInstance("renewer1");
|
||||||
|
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
|
||||||
|
AuthMethod.KERBEROS);
|
||||||
GetDelegationTokenResponse response1 =
|
GetDelegationTokenResponse response1 =
|
||||||
rm1.getClientRMService().getDelegationToken(request1);
|
rm1.getClientRMService().getDelegationToken(request1);
|
||||||
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
|
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
|
||||||
|
@ -644,7 +651,7 @@ public class TestRMRestart {
|
||||||
rm1.getRMDTSecretManager().getAllTokens();
|
rm1.getRMDTSecretManager().getAllTokens();
|
||||||
Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
|
Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
|
||||||
Assert.assertEquals(allTokensRM1, rmDTState);
|
Assert.assertEquals(allTokensRM1, rmDTState);
|
||||||
|
|
||||||
// assert sequence number is saved
|
// assert sequence number is saved
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
|
rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
|
||||||
|
@ -682,7 +689,7 @@ public class TestRMRestart {
|
||||||
// assert master keys and tokens are populated back to DTSecretManager
|
// assert master keys and tokens are populated back to DTSecretManager
|
||||||
Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
|
Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
|
||||||
rm2.getRMDTSecretManager().getAllTokens();
|
rm2.getRMDTSecretManager().getAllTokens();
|
||||||
Assert.assertEquals(allTokensRM1, allTokensRM2);
|
Assert.assertEquals(allTokensRM2.keySet(), allTokensRM1.keySet());
|
||||||
// rm2 has its own master keys when it starts, we use containsAll here
|
// rm2 has its own master keys when it starts, we use containsAll here
|
||||||
Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
|
Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
|
||||||
.containsAll(allKeysRM1));
|
.containsAll(allKeysRM1));
|
||||||
|
@ -735,15 +742,24 @@ public class TestRMRestart {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doSecureLogin() throws IOException {
|
protected ClientRMService createClientRMService() {
|
||||||
// Do nothing.
|
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
||||||
|
rmAppManager, applicationACLsManager, rmDTSecretManager){
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
//do nothing
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void doSecureLogin() throws IOException {
|
||||||
super.serviceInit(conf);
|
// Do nothing.
|
||||||
RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
|
||||||
this.getRMDTSecretManager(), rmAddr);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,10 @@ import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -48,9 +50,12 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenRenewer;
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -141,6 +146,13 @@ public class TestDelegationTokenRenewer {
|
||||||
Renewer.reset();
|
Renewer.reset();
|
||||||
delegationTokenRenewer = new DelegationTokenRenewer();
|
delegationTokenRenewer = new DelegationTokenRenewer();
|
||||||
delegationTokenRenewer.init(conf);
|
delegationTokenRenewer.init(conf);
|
||||||
|
RMContext mockContext = mock(RMContext.class);
|
||||||
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
InetSocketAddress sockAddr =
|
||||||
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
|
delegationTokenRenewer.setRMContext(mockContext);
|
||||||
delegationTokenRenewer.start();
|
delegationTokenRenewer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,6 +466,13 @@ public class TestDelegationTokenRenewer {
|
||||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||||
1000l);
|
1000l);
|
||||||
localDtr.init(lconf);
|
localDtr.init(lconf);
|
||||||
|
RMContext mockContext = mock(RMContext.class);
|
||||||
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
InetSocketAddress sockAddr =
|
||||||
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
|
localDtr.setRMContext(mockContext);
|
||||||
localDtr.start();
|
localDtr.start();
|
||||||
|
|
||||||
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
||||||
|
@ -511,6 +530,13 @@ public class TestDelegationTokenRenewer {
|
||||||
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
|
||||||
1000l);
|
1000l);
|
||||||
localDtr.init(lconf);
|
localDtr.init(lconf);
|
||||||
|
RMContext mockContext = mock(RMContext.class);
|
||||||
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
InetSocketAddress sockAddr =
|
||||||
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
|
localDtr.setRMContext(mockContext);
|
||||||
localDtr.start();
|
localDtr.start();
|
||||||
|
|
||||||
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
MyFS dfs = (MyFS)FileSystem.get(lconf);
|
||||||
|
@ -550,7 +576,7 @@ public class TestDelegationTokenRenewer {
|
||||||
} catch (InvalidToken ite) {}
|
} catch (InvalidToken ite) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=2000)
|
@Test(timeout=20000)
|
||||||
public void testConncurrentAddApplication()
|
public void testConncurrentAddApplication()
|
||||||
throws IOException, InterruptedException, BrokenBarrierException {
|
throws IOException, InterruptedException, BrokenBarrierException {
|
||||||
final CyclicBarrier startBarrier = new CyclicBarrier(2);
|
final CyclicBarrier startBarrier = new CyclicBarrier(2);
|
||||||
|
@ -579,6 +605,13 @@ public class TestDelegationTokenRenewer {
|
||||||
// fire up the renewer
|
// fire up the renewer
|
||||||
final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
|
final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
|
||||||
dtr.init(conf);
|
dtr.init(conf);
|
||||||
|
RMContext mockContext = mock(RMContext.class);
|
||||||
|
ClientRMService mockClientRMService = mock(ClientRMService.class);
|
||||||
|
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
|
||||||
|
InetSocketAddress sockAddr =
|
||||||
|
InetSocketAddress.createUnresolved("localhost", 1234);
|
||||||
|
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
|
||||||
|
dtr.setRMContext(mockContext);
|
||||||
dtr.start();
|
dtr.start();
|
||||||
|
|
||||||
// submit a job that blocks during renewal
|
// submit a job that blocks during renewal
|
||||||
|
|
Loading…
Reference in New Issue