YARN-581. Added a test to verify that app delegation tokens are restored after RM restart. Contributed by Jian He.

svn merge --ignore-ancestry -c 1471187 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1471188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-23 22:51:19 +00:00
parent 23f099da0b
commit 700a6aa0a0
4 changed files with 170 additions and 17 deletions

View File

@ -100,6 +100,9 @@ Release 2.0.5-beta - UNRELEASED
filecache sub-directory under application directory. (Omkar Vinit Joshi via filecache sub-directory under application directory. (Omkar Vinit Joshi via
vinodkv) vinodkv)
YARN-581. Added a test to verify that app delegation tokens are restored
after RM restart. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Service to renew application delegation tokens. * Service to renew application delegation tokens.
*/ */
@ -139,7 +141,8 @@ public class DelegationTokenRenewer extends AbstractService {
* class that is used for keeping tracks of DT to renew * class that is used for keeping tracks of DT to renew
* *
*/ */
private static class DelegationTokenToRenew { @VisibleForTesting
protected static class DelegationTokenToRenew {
public final Token<?> token; public final Token<?> token;
public final ApplicationId applicationId; public final ApplicationId applicationId;
public final Configuration conf; public final Configuration conf;
@ -252,7 +255,16 @@ public class DelegationTokenRenewer extends AbstractService {
private void addTokenToList(DelegationTokenToRenew t) { private void addTokenToList(DelegationTokenToRenew t) {
delegationTokens.add(t); delegationTokens.add(t);
} }
@VisibleForTesting
public Set<Token<?>> getDelegationTokens() {
Set<Token<?>> tokens = new HashSet<Token<?>>();
for(DelegationTokenToRenew delegationToken : delegationTokens) {
tokens.add(delegationToken.token);
}
return tokens;
}
/** /**
* Add application tokens for renewal. * Add application tokens for renewal.
* @param applicationId added application * @param applicationId added application
@ -343,7 +355,8 @@ public class DelegationTokenRenewer extends AbstractService {
/** /**
* set task to renew the token * set task to renew the token
*/ */
private void setTimerForTokenRenewal(DelegationTokenToRenew token) @VisibleForTesting
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException { throws IOException {
// calculate timer time // calculate timer time
@ -358,7 +371,8 @@ public class DelegationTokenRenewer extends AbstractService {
} }
// renew a token // renew a token
private void renewToken(final DelegationTokenToRenew dttr) @VisibleForTesting
protected void renewToken(final DelegationTokenToRenew dttr)
throws IOException { throws IOException {
// need to use doAs so that http can find the kerberos tgt // need to use doAs so that http can find the kerberos tgt
// NOTE: token renewers should be responsible for the correct UGI! // NOTE: token renewers should be responsible for the correct UGI!

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Map; import java.util.Map;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -130,26 +134,26 @@ public class MockRM extends ResourceManager {
public RMApp submitApp(int masterMemory, String name, String user) throws Exception { public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
return submitApp(masterMemory, name, user, null, false, null, return submitApp(masterMemory, name, user, null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception { Map<ApplicationAccessType, String> acls) throws Exception {
return submitApp(masterMemory, name, user, acls, false, null, return submitApp(masterMemory, name, user, acls, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception { Map<ApplicationAccessType, String> acls, String queue) throws Exception {
return submitApp(masterMemory, name, user, acls, false, queue, return submitApp(masterMemory, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts) throws Exception { int maxAppAttempts, Credentials ts) throws Exception {
ClientRMProtocol client = getClientRMService(); ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class)); .newRecord(GetNewApplicationRequest.class));
@ -175,6 +179,12 @@ public class MockRM extends ResourceManager {
sub.setResource(capability); sub.setResource(capability);
clc.setApplicationACLs(acls); clc.setApplicationACLs(acls);
clc.setUser(user); clc.setUser(user);
if (ts != null && UserGroupInformation.isSecurityEnabled()) {
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
clc.setContainerTokens(securityTokens);
}
sub.setAMContainerSpec(clc); sub.setAMContainerSpec(clc);
req.setApplicationSubmissionContext(sub); req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser = UserGroupInformation fakeUser =
@ -357,6 +367,10 @@ public class MockRM extends ResourceManager {
return this.nodesListManager; return this.nodesListManager;
} }
public RMDelegationTokenSecretManager getRMDTSecretManager() {
return this.rmDTSecretManager;
}
@Override @Override
protected void startWepApp() { protected void startWepApp() {
// override to disable webapp // override to disable webapp

View File

@ -18,11 +18,21 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -33,9 +43,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -43,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -60,10 +74,8 @@ public class TestRMRestart {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -159,7 +171,7 @@ public class TestRMRestart {
// create unmanaged app // create unmanaged app
RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
ApplicationAttemptId unmanagedAttemptId = ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved // assert appUnmanaged info is saved
@ -321,8 +333,7 @@ public class TestRMRestart {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -340,10 +351,12 @@ public class TestRMRestart {
// submit an app with maxAppAttempts equals to 1 // submit an app with maxAppAttempts equals to 1
RMApp app1 = rm1.submitApp(200, "name", "user", RMApp app1 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1); new HashMap<ApplicationAccessType, String>(), false, "default", 1,
null);
// submit an app with maxAppAttempts equals to -1 // submit an app with maxAppAttempts equals to -1
RMApp app2 = rm1.submitApp(200, "name", "user", RMApp app2 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1); new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null);
// assert app1 info is saved // assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId()); ApplicationState appState = rmAppState.get(app1.getApplicationId());
@ -389,4 +402,113 @@ public class TestRMRestart {
rm1.stop(); rm1.stop();
rm2.stop(); rm2.stop();
} }
@Test
public void testTokenRestoredOnRMrestart() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MyMockRM(conf, memStore);
rm1.start();
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
new HashSet<Token<RMDelegationTokenIdentifier>>();
// create an empty credential
Credentials ts = new Credentials();
// create tokens and add into credential
Text userText1 = new Text("user1");
RMDelegationTokenIdentifier dtId1 =
new RMDelegationTokenIdentifier(userText1, new Text("renewer1"),
userText1);
Token<RMDelegationTokenIdentifier> token1 =
new Token<RMDelegationTokenIdentifier>(dtId1,
rm1.getRMDTSecretManager());
ts.addToken(userText1, token1);
tokenSet.add(token1);
Text userText2 = new Text("user2");
RMDelegationTokenIdentifier dtId2 =
new RMDelegationTokenIdentifier(userText2, new Text("renewer2"),
userText2);
Token<RMDelegationTokenIdentifier> token2 =
new Token<RMDelegationTokenIdentifier>(dtId2,
rm1.getRMDTSecretManager());
ts.addToken(userText2, token2);
tokenSet.add(token2);
// submit an app with customized credential
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
// assert app info is saved
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
// assert delegation tokens are saved
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
Assert.assertEquals(securityTokens, appState
.getApplicationSubmissionContext().getAMContainerSpec()
.getContainerTokens());
// start new RM
MockRM rm2 = new MyMockRM(conf, memStore);
rm2.start();
// verify tokens are properly populated back to DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm1.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens());
// stop the RM
rm1.stop();
rm2.stop();
}
class MyMockRM extends MockRM {
public MyMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
}
@Override
protected void doSecureLogin() throws IOException {
// Do nothing.
}
@Override
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer() {
@Override
protected void renewToken(final DelegationTokenToRenew dttr)
throws IOException {
// Do nothing
}
@Override
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException {
// Do nothing
}
};
}
}
} }