YARN-1094. Fixed a blocker with RM restart code because of which RM crashes when try to recover an existing app. Contributed by Vinod Kumar Vavilapalli.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1517215 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
962da4dcc7
commit
18e805677d
|
@ -93,6 +93,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||
YARN-1085. Modified YARN and MR2 web-apps to do HTTP authentication in
|
||||
secure setup with kerberos. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
YARN-1094. Fixed a blocker with RM restart code because of which RM crashes
|
||||
when try to recover an existing app. (vinodkv)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-22
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -33,7 +33,7 @@ public abstract class GetDelegationTokenRequest {
|
|||
|
||||
@Public
|
||||
@Stable
|
||||
public GetDelegationTokenRequest newInstance(String renewer) {
|
||||
public static GetDelegationTokenRequest newInstance(String renewer) {
|
||||
GetDelegationTokenRequest request =
|
||||
Records.newRecord(GetDelegationTokenRequest.class);
|
||||
request.setRenewer(renewer);
|
||||
|
|
|
@ -56,7 +56,7 @@ public class RMContextImpl implements RMContext {
|
|||
private AMLivelinessMonitor amFinishingMonitor;
|
||||
private RMStateStore stateStore = null;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
private final DelegationTokenRenewer tokenRenewer;
|
||||
private final DelegationTokenRenewer delegationTokenRenewer;
|
||||
private final AMRMTokenSecretManager amRMTokenSecretManager;
|
||||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
|
@ -67,7 +67,7 @@ public class RMContextImpl implements RMContext {
|
|||
ContainerAllocationExpirer containerAllocationExpirer,
|
||||
AMLivelinessMonitor amLivelinessMonitor,
|
||||
AMLivelinessMonitor amFinishingMonitor,
|
||||
DelegationTokenRenewer tokenRenewer,
|
||||
DelegationTokenRenewer delegationTokenRenewer,
|
||||
AMRMTokenSecretManager amRMTokenSecretManager,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||
|
@ -77,7 +77,7 @@ public class RMContextImpl implements RMContext {
|
|||
this.containerAllocationExpirer = containerAllocationExpirer;
|
||||
this.amLivelinessMonitor = amLivelinessMonitor;
|
||||
this.amFinishingMonitor = amFinishingMonitor;
|
||||
this.tokenRenewer = tokenRenewer;
|
||||
this.delegationTokenRenewer = delegationTokenRenewer;
|
||||
this.amRMTokenSecretManager = amRMTokenSecretManager;
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
|
@ -90,13 +90,13 @@ public class RMContextImpl implements RMContext {
|
|||
ContainerAllocationExpirer containerAllocationExpirer,
|
||||
AMLivelinessMonitor amLivelinessMonitor,
|
||||
AMLivelinessMonitor amFinishingMonitor,
|
||||
DelegationTokenRenewer tokenRenewer,
|
||||
DelegationTokenRenewer delegationTokenRenewer,
|
||||
AMRMTokenSecretManager appTokenSecretManager,
|
||||
RMContainerTokenSecretManager containerTokenSecretManager,
|
||||
NMTokenSecretManagerInRM nmTokenSecretManager,
|
||||
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
|
||||
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
|
||||
amFinishingMonitor, tokenRenewer, appTokenSecretManager,
|
||||
amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
|
||||
containerTokenSecretManager, nmTokenSecretManager,
|
||||
clientToAMTokenSecretManager);
|
||||
RMStateStore nullStore = new NullRMStateStore();
|
||||
|
@ -151,7 +151,7 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
@Override
|
||||
public DelegationTokenRenewer getDelegationTokenRenewer() {
|
||||
return tokenRenewer;
|
||||
return delegationTokenRenewer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -130,6 +130,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
protected RMAppManager rmAppManager;
|
||||
protected ApplicationACLsManager applicationACLsManager;
|
||||
protected RMDelegationTokenSecretManager rmDTSecretManager;
|
||||
private DelegationTokenRenewer delegationTokenRenewer;
|
||||
private WebApp webApp;
|
||||
protected RMContext rmContext;
|
||||
protected ResourceTrackerService resourceTracker;
|
||||
|
@ -169,8 +170,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
|
||||
addService(amFinishingMonitor);
|
||||
|
||||
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
||||
addService(tokenRenewer);
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
this.delegationTokenRenewer = createDelegationTokenRenewer();
|
||||
addService(delegationTokenRenewer);
|
||||
}
|
||||
|
||||
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
|
||||
this.nmTokenSecretManager = createNMTokenSecretManager(conf);
|
||||
|
@ -201,7 +204,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.rmContext =
|
||||
new RMContextImpl(this.rmDispatcher, rmStore,
|
||||
this.containerAllocationExpirer, amLivelinessMonitor,
|
||||
amFinishingMonitor, tokenRenewer, this.amRmTokenSecretManager,
|
||||
amFinishingMonitor, delegationTokenRenewer, this.amRmTokenSecretManager,
|
||||
this.containerTokenSecretManager, this.nmTokenSecretManager,
|
||||
this.clientToAMSecretManager);
|
||||
|
||||
|
@ -610,6 +613,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.containerTokenSecretManager.start();
|
||||
this.nmTokenSecretManager.start();
|
||||
|
||||
// Explicitly start DTRenewer too in secure mode before kicking recovery as
|
||||
// tokens will start getting added for renewal as part of the recovery
|
||||
// process itself.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
this.delegationTokenRenewer.start();
|
||||
}
|
||||
|
||||
RMStateStore rmStore = rmContext.getStateStore();
|
||||
// The state store needs to start irrespective of recoveryEnabled as apps
|
||||
// need events to move to further states.
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
|
@ -602,8 +603,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
new NodeUpdateSchedulerEvent(rmNode));
|
||||
}
|
||||
|
||||
// Update DTRenewer in secure mode to keep these apps alive. Today this is
|
||||
// needed for log-aggregation to finish long after the apps are gone.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
|
||||
statusEvent.getKeepAliveAppIds());
|
||||
}
|
||||
|
||||
return NodeState.RUNNING;
|
||||
}
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -35,6 +35,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
|
@ -63,7 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -77,8 +77,11 @@ public class TestRMRestart {
|
|||
|
||||
private YarnConfiguration conf;
|
||||
|
||||
// Fake rmAddr for token-renewal
|
||||
private static InetSocketAddress rmAddr;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
public void setup() throws UnknownHostException {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
ExitUtil.disableSystemExit();
|
||||
|
@ -86,6 +89,8 @@ public class TestRMRestart {
|
|||
UserGroupInformation.setConfiguration(conf);
|
||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
|
||||
rmAddr = new InetSocketAddress(InetAddress.getLocalHost(), 123);
|
||||
}
|
||||
|
||||
@Test (timeout=180000)
|
||||
|
@ -446,6 +451,7 @@ public class TestRMRestart {
|
|||
Token<RMDelegationTokenIdentifier> token1 =
|
||||
new Token<RMDelegationTokenIdentifier>(dtId1,
|
||||
rm1.getRMDTSecretManager());
|
||||
SecurityUtil.setTokenService(token1, rmAddr);
|
||||
ts.addToken(userText1, token1);
|
||||
tokenSet.add(token1);
|
||||
|
||||
|
@ -456,6 +462,7 @@ public class TestRMRestart {
|
|||
Token<RMDelegationTokenIdentifier> token2 =
|
||||
new Token<RMDelegationTokenIdentifier>(dtId2,
|
||||
rm1.getRMDTSecretManager());
|
||||
SecurityUtil.setTokenService(token2, rmAddr);
|
||||
ts.addToken(userText2, token2);
|
||||
tokenSet.add(token2);
|
||||
|
||||
|
@ -575,6 +582,7 @@ public class TestRMRestart {
|
|||
@Test
|
||||
public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
@ -587,20 +595,21 @@ public class TestRMRestart {
|
|||
rmState.getRMDTSecretManagerState().getMasterKeyState();
|
||||
|
||||
MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
||||
|
||||
rm1.start();
|
||||
|
||||
// create an empty credential
|
||||
Credentials ts = new Credentials();
|
||||
|
||||
// request a token and add into credential
|
||||
GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class);
|
||||
when(request1.getRenewer()).thenReturn("renewer1");
|
||||
GetDelegationTokenRequest request1 =
|
||||
GetDelegationTokenRequest.newInstance("renewer1");
|
||||
GetDelegationTokenResponse response1 =
|
||||
rm1.getClientRMService().getDelegationToken(request1);
|
||||
org.apache.hadoop.yarn.api.records.Token delegationToken1 =
|
||||
response1.getRMDelegationToken();
|
||||
Token<RMDelegationTokenIdentifier> token1 =
|
||||
ConverterUtils.convertFromYarn(delegationToken1, null);
|
||||
ConverterUtils.convertFromYarn(delegationToken1, rmAddr);
|
||||
RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
|
||||
|
||||
HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
|
||||
|
@ -632,14 +641,14 @@ public class TestRMRestart {
|
|||
rmState.getRMDTSecretManagerState().getDTSequenceNumber());
|
||||
|
||||
// request one more token
|
||||
GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class);
|
||||
when(request2.getRenewer()).thenReturn("renewer2");
|
||||
GetDelegationTokenRequest request2 =
|
||||
GetDelegationTokenRequest.newInstance("renewer2");
|
||||
GetDelegationTokenResponse response2 =
|
||||
rm1.getClientRMService().getDelegationToken(request2);
|
||||
org.apache.hadoop.yarn.api.records.Token delegationToken2 =
|
||||
response2.getRMDelegationToken();
|
||||
Token<RMDelegationTokenIdentifier> token2 =
|
||||
ConverterUtils.convertFromYarn(delegationToken2, null);
|
||||
ConverterUtils.convertFromYarn(delegationToken2, rmAddr);
|
||||
RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
|
||||
|
||||
// cancel token2
|
||||
|
@ -721,20 +730,10 @@ public class TestRMRestart {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
@Override
|
||||
protected void renewToken(final DelegationTokenToRenew dttr)
|
||||
throws IOException {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
||||
throws IOException {
|
||||
// Do nothing
|
||||
}
|
||||
};
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
RMDelegationTokenIdentifier.Renewer.setSecretManager(
|
||||
this.getRMDTSecretManager(), rmAddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue