YARN-581. Added a test to verify that app delegation tokens are restored after RM restart. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1471187 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27e8c86999
commit
cd975d4b64
|
@ -174,6 +174,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
filecache sub-directory under application directory. (Omkar Vinit Joshi via
|
||||
vinodkv)
|
||||
|
||||
YARN-581. Added a test to verify that app delegation tokens are restored
|
||||
after RM restart. (Jian He via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Service to renew application delegation tokens.
|
||||
*/
|
||||
|
@ -139,7 +141,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
* class that is used for keeping tracks of DT to renew
|
||||
*
|
||||
*/
|
||||
private static class DelegationTokenToRenew {
|
||||
@VisibleForTesting
|
||||
protected static class DelegationTokenToRenew {
|
||||
public final Token<?> token;
|
||||
public final ApplicationId applicationId;
|
||||
public final Configuration conf;
|
||||
|
@ -252,7 +255,16 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
private void addTokenToList(DelegationTokenToRenew t) {
|
||||
delegationTokens.add(t);
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public Set<Token<?>> getDelegationTokens() {
|
||||
Set<Token<?>> tokens = new HashSet<Token<?>>();
|
||||
for(DelegationTokenToRenew delegationToken : delegationTokens) {
|
||||
tokens.add(delegationToken.token);
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add application tokens for renewal.
|
||||
* @param applicationId added application
|
||||
|
@ -343,7 +355,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
/**
|
||||
* set task to renew the token
|
||||
*/
|
||||
private void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
||||
@VisibleForTesting
|
||||
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
||||
throws IOException {
|
||||
|
||||
// calculate timer time
|
||||
|
@ -358,7 +371,8 @@ public class DelegationTokenRenewer extends AbstractService {
|
|||
}
|
||||
|
||||
// renew a token
|
||||
private void renewToken(final DelegationTokenToRenew dttr)
|
||||
@VisibleForTesting
|
||||
protected void renewToken(final DelegationTokenToRenew dttr)
|
||||
throws IOException {
|
||||
// need to use doAs so that http can find the kerberos tgt
|
||||
// NOTE: token renewers should be responsible for the correct UGI!
|
||||
|
|
|
@ -18,12 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Map;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
|
@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
|
@ -130,26 +134,26 @@ public class MockRM extends ResourceManager {
|
|||
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
|
||||
return submitApp(masterMemory, name, user, null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, String queue) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, false, queue,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||
int maxAppAttempts) throws Exception {
|
||||
int maxAppAttempts, Credentials ts) throws Exception {
|
||||
ClientRMProtocol client = getClientRMService();
|
||||
GetNewApplicationResponse resp = client.getNewApplication(Records
|
||||
.newRecord(GetNewApplicationRequest.class));
|
||||
|
@ -175,6 +179,12 @@ public class MockRM extends ResourceManager {
|
|||
sub.setResource(capability);
|
||||
clc.setApplicationACLs(acls);
|
||||
clc.setUser(user);
|
||||
if (ts != null && UserGroupInformation.isSecurityEnabled()) {
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
clc.setContainerTokens(securityTokens);
|
||||
}
|
||||
sub.setAMContainerSpec(clc);
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
UserGroupInformation fakeUser =
|
||||
|
@ -357,6 +367,10 @@ public class MockRM extends ResourceManager {
|
|||
return this.nodesListManager;
|
||||
}
|
||||
|
||||
public RMDelegationTokenSecretManager getRMDTSecretManager() {
|
||||
return this.rmDTSecretManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void startWepApp() {
|
||||
// override to disable webapp
|
||||
|
|
|
@ -18,11 +18,21 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
|
@ -33,9 +43,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
|
@ -43,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
@ -60,10 +74,8 @@ public class TestRMRestart {
|
|||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.RM_STORE,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
|
||||
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
|
@ -159,7 +171,7 @@ public class TestRMRestart {
|
|||
// create unmanaged app
|
||||
RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
|
||||
null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
ApplicationAttemptId unmanagedAttemptId =
|
||||
appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
|
||||
// assert appUnmanaged info is saved
|
||||
|
@ -321,8 +333,7 @@ public class TestRMRestart {
|
|||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.RM_STORE,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||
|
@ -340,10 +351,12 @@ public class TestRMRestart {
|
|||
|
||||
// submit an app with maxAppAttempts equals to 1
|
||||
RMApp app1 = rm1.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", 1);
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", 1,
|
||||
null);
|
||||
// submit an app with maxAppAttempts equals to -1
|
||||
RMApp app2 = rm1.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", -1);
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
||||
null);
|
||||
|
||||
// assert app1 info is saved
|
||||
ApplicationState appState = rmAppState.get(app1.getApplicationId());
|
||||
|
@ -389,4 +402,113 @@ public class TestRMRestart {
|
|||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTokenRestoredOnRMrestart() throws Exception {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
ExitUtil.disableSystemExit();
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
|
||||
Map<ApplicationId, ApplicationState> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
MockRM rm1 = new MyMockRM(conf, memStore);
|
||||
rm1.start();
|
||||
|
||||
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
|
||||
new HashSet<Token<RMDelegationTokenIdentifier>>();
|
||||
|
||||
// create an empty credential
|
||||
Credentials ts = new Credentials();
|
||||
|
||||
// create tokens and add into credential
|
||||
Text userText1 = new Text("user1");
|
||||
RMDelegationTokenIdentifier dtId1 =
|
||||
new RMDelegationTokenIdentifier(userText1, new Text("renewer1"),
|
||||
userText1);
|
||||
Token<RMDelegationTokenIdentifier> token1 =
|
||||
new Token<RMDelegationTokenIdentifier>(dtId1,
|
||||
rm1.getRMDTSecretManager());
|
||||
ts.addToken(userText1, token1);
|
||||
tokenSet.add(token1);
|
||||
|
||||
Text userText2 = new Text("user2");
|
||||
RMDelegationTokenIdentifier dtId2 =
|
||||
new RMDelegationTokenIdentifier(userText2, new Text("renewer2"),
|
||||
userText2);
|
||||
Token<RMDelegationTokenIdentifier> token2 =
|
||||
new Token<RMDelegationTokenIdentifier>(dtId2,
|
||||
rm1.getRMDTSecretManager());
|
||||
ts.addToken(userText2, token2);
|
||||
tokenSet.add(token2);
|
||||
|
||||
// submit an app with customized credential
|
||||
RMApp app = rm1.submitApp(200, "name", "user",
|
||||
new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
|
||||
|
||||
// assert app info is saved
|
||||
ApplicationState appState = rmAppState.get(app.getApplicationId());
|
||||
Assert.assertNotNull(appState);
|
||||
|
||||
// assert delegation tokens are saved
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
ts.writeTokenStorageToStream(dob);
|
||||
ByteBuffer securityTokens =
|
||||
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||
Assert.assertEquals(securityTokens, appState
|
||||
.getApplicationSubmissionContext().getAMContainerSpec()
|
||||
.getContainerTokens());
|
||||
|
||||
// start new RM
|
||||
MockRM rm2 = new MyMockRM(conf, memStore);
|
||||
rm2.start();
|
||||
|
||||
// verify tokens are properly populated back to DelegationTokenRenewer
|
||||
Assert.assertEquals(tokenSet, rm1.getRMContext()
|
||||
.getDelegationTokenRenewer().getDelegationTokens());
|
||||
|
||||
// stop the RM
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
class MyMockRM extends MockRM {
|
||||
|
||||
public MyMockRM(Configuration conf, RMStateStore store) {
|
||||
super(conf, store);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSecureLogin() throws IOException {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DelegationTokenRenewer createDelegationTokenRenewer() {
|
||||
return new DelegationTokenRenewer() {
|
||||
@Override
|
||||
protected void renewToken(final DelegationTokenToRenew dttr)
|
||||
throws IOException {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
|
||||
throws IOException {
|
||||
// Do nothing
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue