From 700a6aa0a03cf04e92be4644ff8ad9140f7c8e74 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 23 Apr 2013 22:51:19 +0000 Subject: [PATCH] YARN-581. Added a test to verify that app delegation tokens are restored after RM restart. Contributed by Jian He. svn merge --ignore-ancestry -c 1471187 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1471188 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../security/DelegationTokenRenewer.java | 22 ++- .../yarn/server/resourcemanager/MockRM.java | 22 ++- .../server/resourcemanager/TestRMRestart.java | 140 ++++++++++++++++-- 4 files changed, 170 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2a776b094ba..bd5f8f0292e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -100,6 +100,9 @@ Release 2.0.5-beta - UNRELEASED filecache sub-directory under application directory. (Omkar Vinit Joshi via vinodkv) + YARN-581. Added a test to verify that app delegation tokens are restored + after RM restart. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 066a0a5b969..a0370c753e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + /** * Service to renew application delegation tokens. */ @@ -139,7 +141,8 @@ public class DelegationTokenRenewer extends AbstractService { * class that is used for keeping tracks of DT to renew * */ - private static class DelegationTokenToRenew { + @VisibleForTesting + protected static class DelegationTokenToRenew { public final Token token; public final ApplicationId applicationId; public final Configuration conf; @@ -252,7 +255,16 @@ public class DelegationTokenRenewer extends AbstractService { private void addTokenToList(DelegationTokenToRenew t) { delegationTokens.add(t); } - + + @VisibleForTesting + public Set> getDelegationTokens() { + Set> tokens = new HashSet>(); + for(DelegationTokenToRenew delegationToken : delegationTokens) { + tokens.add(delegationToken.token); + } + return tokens; + } + /** * Add application tokens for renewal. * @param applicationId added application @@ -343,7 +355,8 @@ public class DelegationTokenRenewer extends AbstractService { /** * set task to renew the token */ - private void setTimerForTokenRenewal(DelegationTokenToRenew token) + @VisibleForTesting + protected void setTimerForTokenRenewal(DelegationTokenToRenew token) throws IOException { // calculate timer time @@ -358,7 +371,8 @@ public class DelegationTokenRenewer extends AbstractService { } // renew a token - private void renewToken(final DelegationTokenToRenew dttr) + @VisibleForTesting + protected void renewToken(final DelegationTokenToRenew dttr) throws IOException { // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index e39f303ce72..c0007078382 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.Map; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -130,26 +134,26 @@ public class MockRM extends ResourceManager { public RMApp submitApp(int masterMemory, String name, String user) throws Exception { return submitApp(masterMemory, name, user, null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } public RMApp submitApp(int masterMemory, String name, String user, Map acls) throws Exception { return submitApp(masterMemory, name, user, acls, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } public RMApp submitApp(int masterMemory, String name, String user, Map acls, String queue) throws Exception { return submitApp(masterMemory, name, user, acls, false, queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, - int maxAppAttempts) throws Exception { + int maxAppAttempts, Credentials ts) throws Exception { ClientRMProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -175,6 +179,12 @@ public class MockRM extends ResourceManager { sub.setResource(capability); clc.setApplicationACLs(acls); clc.setUser(user); + if (ts != null && UserGroupInformation.isSecurityEnabled()) { + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setContainerTokens(securityTokens); + } sub.setAMContainerSpec(clc); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = @@ -357,6 +367,10 @@ public class MockRM extends ResourceManager { return this.nodesListManager; } + public RMDelegationTokenSecretManager getRMDTSecretManager() { + return this.rmDTSecretManager; + } + @Override protected void startWepApp() { // override to disable webapp diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 4ab8901054c..fded9fbc982 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -18,11 +18,21 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -33,9 +43,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -43,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -60,10 +74,8 @@ public class TestRMRestart { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, - "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); - conf.set(YarnConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -159,7 +171,7 @@ public class TestRMRestart { // create unmanaged app RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); ApplicationAttemptId unmanagedAttemptId = appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); // assert appUnmanaged info is saved @@ -321,8 +333,7 @@ public class TestRMRestart { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, - "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -340,10 +351,12 @@ public class TestRMRestart { // submit an app with maxAppAttempts equals to 1 RMApp app1 = rm1.submitApp(200, "name", "user", - new HashMap(), false, "default", 1); + new HashMap(), false, "default", 1, + null); // submit an app with maxAppAttempts equals to -1 RMApp app2 = rm1.submitApp(200, "name", "user", - new HashMap(), false, "default", -1); + new HashMap(), false, "default", -1, + null); // assert app1 info is saved ApplicationState appState = rmAppState.get(app1.getApplicationId()); @@ -389,4 +402,113 @@ public class TestRMRestart { rm1.stop(); rm2.stop(); } + + @Test + public void testTokenRestoredOnRMrestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + + HashSet> tokenSet = + new HashSet>(); + + // create an empty credential + Credentials ts = new Credentials(); + + // create tokens and add into credential + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token token1 = + new Token(dtId1, + rm1.getRMDTSecretManager()); + ts.addToken(userText1, token1); + tokenSet.add(token1); + + Text userText2 = new Text("user2"); + RMDelegationTokenIdentifier dtId2 = + new RMDelegationTokenIdentifier(userText2, new Text("renewer2"), + userText2); + Token token2 = + new Token(dtId2, + rm1.getRMDTSecretManager()); + ts.addToken(userText2, token2); + tokenSet.add(token2); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert delegation tokens are saved + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + Assert.assertEquals(securityTokens, appState + .getApplicationSubmissionContext().getAMContainerSpec() + .getContainerTokens()); + + // start new RM + MockRM rm2 = new MyMockRM(conf, memStore); + rm2.start(); + + // verify tokens are properly populated back to DelegationTokenRenewer + Assert.assertEquals(tokenSet, rm1.getRMContext() + .getDelegationTokenRenewer().getDelegationTokens()); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + class MyMockRM extends MockRM { + + public MyMockRM(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected void doSecureLogin() throws IOException { + // Do nothing. + } + + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + // Do nothing + } + + @Override + protected void setTimerForTokenRenewal(DelegationTokenToRenew token) + throws IOException { + // Do nothing + } + }; + } + } }