From f10ebc67f57a4a2e3cc916c41154ab9b6a4635c9 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 3 Jun 2016 13:00:07 -0700 Subject: [PATCH] YARN-5098. Fixed ResourceManager's DelegationTokenRenewer to replace expiring system-tokens if RM stops and only restarts after a long time. Contributed by Jian He. --- .../security/DelegationTokenRenewer.java | 27 +++-- .../security/TestDelegationTokenRenewer.java | 98 +++++++++++++++++++ 2 files changed, 118 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index fd12f11c789..4177ee21034 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.service.AbstractService; @@ -459,6 +460,18 @@ public class DelegationTokenRenewer extends AbstractService { try { renewToken(dttr); } catch (IOException ioe) { + if (ioe instanceof SecretManager.InvalidToken + && dttr.maxDate < Time.now() + && evt instanceof DelegationTokenRenewerAppRecoverEvent + && token.getKind().equals(HDFS_DELEGATION_KIND)) { + LOG.info("Failed to renew hdfs token " + dttr + + " on recovery as it expired, requesting new hdfs token for " + + applicationId + ", user=" + evt.getUser(), ioe); + requestNewHdfsDelegationTokenAsProxyUser( + Arrays.asList(applicationId), evt.getUser(), + evt.shouldCancelAtEnd()); + continue; + } throw new IOException("Failed to renew token: " + dttr.token, ioe); } } @@ -485,7 +498,8 @@ public class DelegationTokenRenewer extends AbstractService { } if (!hasHdfsToken) { - requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(), + requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId), + evt.getUser(), shouldCancelAtEnd); } } @@ -586,8 +600,7 @@ public class DelegationTokenRenewer extends AbstractService { } catch (InterruptedException e) { throw new IOException(e); } - LOG.info("Renewed delegation-token= [" + dttr + "], for " - + dttr.referringAppIds); + LOG.info("Renewed delegation-token= [" + dttr + "]"); } // Request new hdfs token if the token is about to expire, and remove the old @@ -625,12 +638,12 @@ public class DelegationTokenRenewer extends AbstractService { } } LOG.info("Token= (" + dttr + ") is expiring, request new token."); - requestNewHdfsDelegationToken(applicationIds, dttr.user, + requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user, dttr.shouldCancelAtEnd); } } - private void requestNewHdfsDelegationToken( + private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { @@ -912,8 +925,8 @@ public class DelegationTokenRenewer extends AbstractService { // Setup tokens for renewal during recovery DelegationTokenRenewer.this.handleAppSubmitEvent(event); } catch (Throwable t) { - LOG.warn( - "Unable to add the application to the delegation token renewer.", t); + LOG.warn("Unable to add the application to the delegation token" + + " renewer on recovery.", t); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 1bfac8d9ffa..74fe534d2f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -43,6 +43,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -968,6 +970,101 @@ public class TestDelegationTokenRenewer { Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken)); } + + // 1. token is expired before app completes. + // 2. RM shutdown. + // 3. When RM recovers the app, token renewal will fail as token expired. + // RM should request a new token and sent it to NM for log-aggregation. + @Test + public void testRMRestartWithExpiredToken() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf + .setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + yarnConf + .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + UserGroupInformation.setConfiguration(yarnConf); + + // create Token1: + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(yarnConf); + MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore); + rm1.start(); + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + // create token2 + Text userText2 = new Text("user1"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token updatedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + AtomicBoolean firstRenewInvoked = new AtomicBoolean(false); + AtomicBoolean secondRenewInvoked = new AtomicBoolean(false); + MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + + if (dttr.token.equals(updatedToken)) { + secondRenewInvoked.set(true); + super.renewToken(dttr); + } else if (dttr.token.equals(originalToken)){ + firstRenewInvoked.set(true); + throw new InvalidToken("Failed to renew"); + } else { + throw new IOException("Unexpected"); + } + } + + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(updatedToken.getService(), updatedToken); + return new Token[] { updatedToken }; + } + }; + } + }; + + // simulating restart the rm + rm2.start(); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + ByteBuffer tokenBuffer = + response.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(firstRenewInvoked.get() && secondRenewInvoked.get()); + Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken)); + } + // YARN will get the token for the app submitted without the delegation token. @Test public void testAppSubmissionWithoutDelegationToken() throws Exception { @@ -1158,4 +1255,5 @@ public class TestDelegationTokenRenewer { Assert.assertTrue(dttr.isTimerCancelled()); Assert.assertTrue(Renewer.cancelled); } + }