YARN-1107. Fixed a bug in ResourceManager because of which RM in secure mode fails to restart. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1520726 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-07 05:43:04 +00:00
parent efc1048ffe
commit f35983b805
8 changed files with 160 additions and 49 deletions

View File

@ -156,6 +156,9 @@ Release 2.1.1-beta - UNRELEASED
need more than a node's total capability were incorrectly allocated on that
node causing apps to hang. (Omkar Vinit Joshi via vinodkv)
YARN-1107. Fixed a bug in ResourceManager because of which RM in secure mode
fails to restart. (Omkar Vinit Joshi via vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -43,10 +43,10 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
@ -88,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -160,9 +158,6 @@ public class ClientRMService extends AbstractService implements
this.server.start();
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
server.getListenerAddress());
// enable RM to short-circuit token operations directly to itself
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmDTSecretManager, clientBindAddress);
super.serviceStart();
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
/**
* Context of the ResourceManager.
@ -64,4 +65,13 @@ public interface RMContext {
NMTokenSecretManagerInRM getNMTokenSecretManager();
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
void setClientRMService(ClientRMService clientRMService);
ClientRMService getClientRMService();
RMDelegationTokenSecretManager getRMDelegationTokenSecretManager();
void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager);
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import com.google.common.annotations.VisibleForTesting;
@ -61,6 +62,8 @@ public class RMContextImpl implements RMContext {
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
public RMContextImpl(Dispatcher rmDispatcher,
RMStateStore store,
@ -178,4 +181,25 @@ public class RMContextImpl implements RMContext {
public void setStateStore(RMStateStore store) {
stateStore = store;
}
@Override
public ClientRMService getClientRMService() {
return this.clientRMService;
}
@Override
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
}
@Override
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
return this.rmDelegationTokenSecretManager;
}
@Override
public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
}

View File

@ -169,11 +169,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenRenewer = createDelegationTokenRenewer();
addService(delegationTokenRenewer);
}
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
this.nmTokenSecretManager = createNMTokenSecretManager(conf);
@ -200,6 +195,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
ExitUtil.terminate(1, e);
}
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenRenewer = createDelegationTokenRenewer();
}
this.rmContext =
new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor,
@ -260,7 +259,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager);
this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
rmContext.setRMDelegationTokenSecretManager(this.rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
addService(clientRM);
adminService = createAdminService(clientRM, masterService, resourceTracker);
@ -271,7 +272,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.applicationMasterLauncher);
addService(applicationMasterLauncher);
if (UserGroupInformation.isSecurityEnabled()) {
addService(delegationTokenRenewer);
delegationTokenRenewer.setRMContext(rmContext);
}
new RMNMInfo(this.rmContext, this.scheduler);
super.serviceInit(conf);
@ -620,13 +624,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.containerTokenSecretManager.start();
this.nmTokenSecretManager.start();
// Explicitly start DTRenewer too in secure mode before kicking recovery as
// tokens will start getting added for renewal as part of the recovery
// process itself.
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenRenewer.start();
}
RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states.

View File

@ -34,6 +34,7 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,6 +48,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import com.google.common.annotations.VisibleForTesting;
@ -64,6 +67,7 @@ public class DelegationTokenRenewer extends AbstractService {
// global single timer (daemon)
private Timer renewalTimer;
private RMContext rmContext;
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
@ -80,6 +84,9 @@ public class DelegationTokenRenewer extends AbstractService {
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
private boolean isServiceStarted = false;
private List<DelegationTokenToRenew> pendingTokenForRenewal =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
private boolean tokenKeepAliveEnabled;
@ -100,7 +107,6 @@ public class DelegationTokenRenewer extends AbstractService {
@Override
protected void serviceStart() throws Exception {
dtCancelThread.start();
renewalTimer = new Timer(true);
if (tokenKeepAliveEnabled) {
@ -109,6 +115,15 @@ public class DelegationTokenRenewer extends AbstractService {
"DelayedTokenCanceller");
delayedRemovalThread.start();
}
// enable RM to short-circuit token operations directly to itself
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
// Delegation token renewal is delayed until ClientRMService starts. As
// it is required to short circuit the token renewal calls.
isServiceStarted = true;
renewIfServiceIsStarted(pendingTokenForRenewal);
pendingTokenForRenewal.clear();
super.serviceStart();
}
@ -291,26 +306,41 @@ public class DelegationTokenRenewer extends AbstractService {
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
// At RM restart it is safe to assume that all the previously added tokens
// are valid
List<DelegationTokenToRenew> tokenList =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
for(Token<?> token : tokens) {
// first renew happens immediately
if (token.isManaged()) {
DelegationTokenToRenew dtr =
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
shouldCancelAtEnd);
renewToken(dtr);
dtrs.add(dtr);
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
}
}
if (!tokenList.isEmpty()){
renewIfServiceIsStarted(tokenList);
}
}
protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
throws IOException {
if (isServiceStarted) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
for (DelegationTokenToRenew dtr : dtrs) {
renewToken(dtr);
}
for (DelegationTokenToRenew dtr : dtrs) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
LOG.debug("Registering token for renewal for:" +
" service = " + dtr.token.getService() +
" for appId = " + applicationId);
LOG.debug("Registering token for renewal for:" + " service = "
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
} else {
pendingTokenForRenewal.addAll(dtrs);
}
}
/**
@ -513,4 +543,7 @@ public class DelegationTokenRenewer extends AbstractService {
}
}
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@ -35,6 +34,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -90,7 +90,7 @@ public class TestRMRestart {
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123);
rmAddr = new InetSocketAddress("localhost", 8032);
}
@Test (timeout=180000)
@ -593,6 +593,11 @@ public class TestRMRestart {
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
@ -614,6 +619,8 @@ public class TestRMRestart {
// request a token and add into credential
GetDelegationTokenRequest request1 =
GetDelegationTokenRequest.newInstance("renewer1");
UserGroupInformation.getCurrentUser().setAuthenticationMethod(
AuthMethod.KERBEROS);
GetDelegationTokenResponse response1 =
rm1.getClientRMService().getDelegationToken(request1);
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
@ -682,7 +689,7 @@ public class TestRMRestart {
// assert master keys and tokens are populated back to DTSecretManager
Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
rm2.getRMDTSecretManager().getAllTokens();
Assert.assertEquals(allTokensRM1, allTokensRM2);
Assert.assertEquals(allTokensRM2.keySet(), allTokensRM1.keySet());
// rm2 has its own master keys when it starts, we use containsAll here
Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
.containsAll(allKeysRM1));
@ -735,15 +742,24 @@ public class TestRMRestart {
}
@Override
protected void doSecureLogin() throws IOException {
// Do nothing.
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
rmAppManager, applicationACLsManager, rmDTSecretManager){
@Override
protected void serviceStart() throws Exception {
// do nothing
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
RMDelegationTokenIdentifier.Renewer.setSecretManager(
this.getRMDTSecretManager(), rmAddr);
protected void serviceStop() throws Exception {
//do nothing
}
};
}
@Override
protected void doSecureLogin() throws IOException {
// Do nothing.
}
}
}

View File

@ -25,8 +25,10 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
@ -48,9 +50,12 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@ -141,6 +146,13 @@ public class TestDelegationTokenRenewer {
Renewer.reset();
delegationTokenRenewer = new DelegationTokenRenewer();
delegationTokenRenewer.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
delegationTokenRenewer.setRMContext(mockContext);
delegationTokenRenewer.start();
}
@ -454,6 +466,13 @@ public class TestDelegationTokenRenewer {
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
localDtr.setRMContext(mockContext);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
@ -511,6 +530,13 @@ public class TestDelegationTokenRenewer {
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
1000l);
localDtr.init(lconf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
localDtr.setRMContext(mockContext);
localDtr.start();
MyFS dfs = (MyFS)FileSystem.get(lconf);
@ -550,7 +576,7 @@ public class TestDelegationTokenRenewer {
} catch (InvalidToken ite) {}
}
@Test(timeout=2000)
@Test(timeout=20000)
public void testConncurrentAddApplication()
throws IOException, InterruptedException, BrokenBarrierException {
final CyclicBarrier startBarrier = new CyclicBarrier(2);
@ -579,6 +605,13 @@ public class TestDelegationTokenRenewer {
// fire up the renewer
final DelegationTokenRenewer dtr = new DelegationTokenRenewer();
dtr.init(conf);
RMContext mockContext = mock(RMContext.class);
ClientRMService mockClientRMService = mock(ClientRMService.class);
when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
dtr.setRMContext(mockContext);
dtr.start();
// submit a job that blocks during renewal