YARN-1094. Fixed a blocker with RM restart code because of which RM crashes when try to recover an existing app. Contributed by Vinod Kumar Vavilapalli.

svn merge --ignore-ancestry -c 1517215 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1517216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-08-24 23:33:15 +00:00
parent 2d9c88f45e
commit fbab2acd46
6 changed files with 54 additions and 37 deletions

View File

@ -78,6 +78,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-1085. Modified YARN and MR2 web-apps to do HTTP authentication in YARN-1085. Modified YARN and MR2 web-apps to do HTTP authentication in
secure setup with kerberos. (Omkar Vinit Joshi via vinodkv) secure setup with kerberos. (Omkar Vinit Joshi via vinodkv)
YARN-1094. Fixed a blocker with RM restart code because of which RM crashes
when try to recover an existing app. (vinodkv)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -33,7 +33,7 @@ public abstract class GetDelegationTokenRequest {
@Public @Public
@Stable @Stable
public GetDelegationTokenRequest newInstance(String renewer) { public static GetDelegationTokenRequest newInstance(String renewer) {
GetDelegationTokenRequest request = GetDelegationTokenRequest request =
Records.newRecord(GetDelegationTokenRequest.class); Records.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewer); request.setRenewer(renewer);

View File

@ -56,7 +56,7 @@ public class RMContextImpl implements RMContext {
private AMLivelinessMonitor amFinishingMonitor; private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null; private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer; private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer tokenRenewer; private final DelegationTokenRenewer delegationTokenRenewer;
private final AMRMTokenSecretManager amRMTokenSecretManager; private final AMRMTokenSecretManager amRMTokenSecretManager;
private final RMContainerTokenSecretManager containerTokenSecretManager; private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager;
@ -67,7 +67,7 @@ public class RMContextImpl implements RMContext {
ContainerAllocationExpirer containerAllocationExpirer, ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor, AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor, AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer tokenRenewer, DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager amRMTokenSecretManager, AMRMTokenSecretManager amRMTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager,
@ -77,7 +77,7 @@ public class RMContextImpl implements RMContext {
this.containerAllocationExpirer = containerAllocationExpirer; this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor; this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor; this.amFinishingMonitor = amFinishingMonitor;
this.tokenRenewer = tokenRenewer; this.delegationTokenRenewer = delegationTokenRenewer;
this.amRMTokenSecretManager = amRMTokenSecretManager; this.amRMTokenSecretManager = amRMTokenSecretManager;
this.containerTokenSecretManager = containerTokenSecretManager; this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager;
@ -90,13 +90,13 @@ public class RMContextImpl implements RMContext {
ContainerAllocationExpirer containerAllocationExpirer, ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor, AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor, AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer tokenRenewer, DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager appTokenSecretManager, AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, appTokenSecretManager, amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager); clientToAMTokenSecretManager);
RMStateStore nullStore = new NullRMStateStore(); RMStateStore nullStore = new NullRMStateStore();
@ -151,7 +151,7 @@ public class RMContextImpl implements RMContext {
@Override @Override
public DelegationTokenRenewer getDelegationTokenRenewer() { public DelegationTokenRenewer getDelegationTokenRenewer() {
return tokenRenewer; return delegationTokenRenewer;
} }
@Override @Override

View File

@ -130,6 +130,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMAppManager rmAppManager; protected RMAppManager rmAppManager;
protected ApplicationACLsManager applicationACLsManager; protected ApplicationACLsManager applicationACLsManager;
protected RMDelegationTokenSecretManager rmDTSecretManager; protected RMDelegationTokenSecretManager rmDTSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp; private WebApp webApp;
protected RMContext rmContext; protected RMContext rmContext;
protected ResourceTrackerService resourceTracker; protected ResourceTrackerService resourceTracker;
@ -169,8 +170,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor); addService(amFinishingMonitor);
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer(); if (UserGroupInformation.isSecurityEnabled()) {
addService(tokenRenewer); this.delegationTokenRenewer = createDelegationTokenRenewer();
addService(delegationTokenRenewer);
}
this.containerTokenSecretManager = createContainerTokenSecretManager(conf); this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
this.nmTokenSecretManager = createNMTokenSecretManager(conf); this.nmTokenSecretManager = createNMTokenSecretManager(conf);
@ -201,7 +204,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmContext = this.rmContext =
new RMContextImpl(this.rmDispatcher, rmStore, new RMContextImpl(this.rmDispatcher, rmStore,
this.containerAllocationExpirer, amLivelinessMonitor, this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, this.amRmTokenSecretManager, amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
this.containerTokenSecretManager, this.nmTokenSecretManager, this.containerTokenSecretManager, this.nmTokenSecretManager,
this.clientToAMSecretManager); this.clientToAMSecretManager);
@ -610,6 +613,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.containerTokenSecretManager.start(); this.containerTokenSecretManager.start();
this.nmTokenSecretManager.start(); this.nmTokenSecretManager.start();
// Explicitly start DTRenewer too in secure mode before kicking recovery as
// tokens will start getting added for renewal as part of the recovery
// process itself.
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenRenewer.start();
}
RMStateStore rmStore = rmContext.getStateStore(); RMStateStore rmStore = rmContext.getStateStore();
// The state store needs to start irrespective of recoveryEnabled as apps // The state store needs to start irrespective of recoveryEnabled as apps
// need events to move to further states. // need events to move to further states.

View File

@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
@ -602,8 +603,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new NodeUpdateSchedulerEvent(rmNode)); new NodeUpdateSchedulerEvent(rmNode));
} }
// Update DTRenewer in secure mode to keep these apps alive. Today this is
// needed for log-aggregation to finish long after the apps are gone.
if (UserGroupInformation.isSecurityEnabled()) {
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds()); statusEvent.getKeepAliveAppIds());
}
return NodeState.RUNNING; return NodeState.RUNNING;
} }

View File

@ -18,10 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -63,7 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -77,8 +77,11 @@ public class TestRMRestart {
private YarnConfiguration conf; private YarnConfiguration conf;
// Fake rmAddr for token-renewal
private static InetSocketAddress rmAddr;
@Before @Before
public void setup() { public void setup() throws UnknownHostException {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
@ -86,6 +89,8 @@ public class TestRMRestart {
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123);
} }
@Test (timeout=180000) @Test (timeout=180000)
@ -446,6 +451,7 @@ public class TestRMRestart {
Token<RMDelegationTokenIdentifier> token1 = Token<RMDelegationTokenIdentifier> token1 =
new Token<RMDelegationTokenIdentifier>(dtId1, new Token<RMDelegationTokenIdentifier>(dtId1,
rm1.getRMDTSecretManager()); rm1.getRMDTSecretManager());
SecurityUtil.setTokenService(token1, rmAddr);
ts.addToken(userText1, token1); ts.addToken(userText1, token1);
tokenSet.add(token1); tokenSet.add(token1);
@ -456,6 +462,7 @@ public class TestRMRestart {
Token<RMDelegationTokenIdentifier> token2 = Token<RMDelegationTokenIdentifier> token2 =
new Token<RMDelegationTokenIdentifier>(dtId2, new Token<RMDelegationTokenIdentifier>(dtId2,
rm1.getRMDTSecretManager()); rm1.getRMDTSecretManager());
SecurityUtil.setTokenService(token2, rmAddr);
ts.addToken(userText2, token2); ts.addToken(userText2, token2);
tokenSet.add(token2); tokenSet.add(token2);
@ -575,6 +582,7 @@ public class TestRMRestart {
@Test @Test
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf); memStore.init(conf);
RMState rmState = memStore.getState(); RMState rmState = memStore.getState();
@ -587,20 +595,21 @@ public class TestRMRestart {
rmState.getRMDTSecretManagerState().getMasterKeyState(); rmState.getRMDTSecretManagerState().getMasterKeyState();
MockRM rm1 = new TestSecurityMockRM(conf, memStore); MockRM rm1 = new TestSecurityMockRM(conf, memStore);
rm1.start(); rm1.start();
// create an empty credential // create an empty credential
Credentials ts = new Credentials(); Credentials ts = new Credentials();
// request a token and add into credential // request a token and add into credential
GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class); GetDelegationTokenRequest request1 =
when(request1.getRenewer()).thenReturn("renewer1"); GetDelegationTokenRequest.newInstance("renewer1");
GetDelegationTokenResponse response1 = GetDelegationTokenResponse response1 =
rm1.getClientRMService().getDelegationToken(request1); rm1.getClientRMService().getDelegationToken(request1);
org.apache.hadoop.yarn.api.records.Token delegationToken1 = org.apache.hadoop.yarn.api.records.Token delegationToken1 =
response1.getRMDelegationToken(); response1.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token1 = Token<RMDelegationTokenIdentifier> token1 =
ConverterUtils.convertFromYarn(delegationToken1, null); ConverterUtils.convertFromYarn(delegationToken1, rmAddr);
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier(); RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
HashSet<RMDelegationTokenIdentifier> tokenIdentSet = HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
@ -632,14 +641,14 @@ public class TestRMRestart {
rmState.getRMDTSecretManagerState().getDTSequenceNumber()); rmState.getRMDTSecretManagerState().getDTSequenceNumber());
// request one more token // request one more token
GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class); GetDelegationTokenRequest request2 =
when(request2.getRenewer()).thenReturn("renewer2"); GetDelegationTokenRequest.newInstance("renewer2");
GetDelegationTokenResponse response2 = GetDelegationTokenResponse response2 =
rm1.getClientRMService().getDelegationToken(request2); rm1.getClientRMService().getDelegationToken(request2);
org.apache.hadoop.yarn.api.records.Token delegationToken2 = org.apache.hadoop.yarn.api.records.Token delegationToken2 =
response2.getRMDelegationToken(); response2.getRMDelegationToken();
Token<RMDelegationTokenIdentifier> token2 = Token<RMDelegationTokenIdentifier> token2 =
ConverterUtils.convertFromYarn(delegationToken2, null); ConverterUtils.convertFromYarn(delegationToken2, rmAddr);
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier(); RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
// cancel token2 // cancel token2
@ -721,20 +730,10 @@ public class TestRMRestart {
} }
@Override @Override
protected DelegationTokenRenewer createDelegationTokenRenewer() { protected void serviceInit(Configuration conf) throws Exception {
return new DelegationTokenRenewer() { super.serviceInit(conf);
@Override RMDelegationTokenIdentifier.Renewer.setSecretManager(
protected void renewToken(final DelegationTokenToRenew dttr) this.getRMDTSecretManager(), rmAddr);
throws IOException {
// Do nothing
}
@Override
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException {
// Do nothing
}
};
} }
} }
} }