From 235e84bf3de13141af5f246dc813d4067ba6fadf Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 23 Oct 2015 20:57:01 +0000 Subject: [PATCH] YARN-4041. Slow delegation token renewal can severely prolong RM recovery. Contributed by Sunil G (cherry picked from commit d3a34a4f388155f6a7ef040e244ce7be788cd28b) --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 14 ++-- .../security/DelegationTokenRenewer.java | 69 +++++++++++++++++-- .../server/resourcemanager/TestRMRestart.java | 24 +++---- 4 files changed, 86 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1e9423f2345..87bf5606543 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1024,6 +1024,9 @@ Release 2.7.2 - UNRELEASED YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks) + YARN-4041. Slow delegation token renewal can severely prolong RM recovery + (Sunil G via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 43a3a51a467..41254d8124b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -946,14 +946,16 @@ public class RMAppImpl implements RMApp, Recoverable { } if (UserGroupInformation.isSecurityEnabled()) { - // synchronously renew delegation token on recovery. + // asynchronously renew delegation token on recovery. try { - app.rmContext.getDelegationTokenRenewer().addApplicationSync( - app.getApplicationId(), app.parseCredentials(), - app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); + app.rmContext.getDelegationTokenRenewer() + .addApplicationAsyncDuringRecovery(app.getApplicationId(), + app.parseCredentials(), + app.submissionContext.getCancelTokensWhenComplete(), + app.getUser()); } catch (Exception e) { - String msg = "Failed to renew token for " + app.applicationId - + " on recovery : " + e.getMessage(); + String msg = "Failed to fetch user credentials from application:" + + e.getMessage(); app.diagnostics.append(msg); LOG.error(msg, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 426e460ec6b..cca14e9472f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -387,6 +387,25 @@ public class DelegationTokenRenewer extends AbstractService { applicationId, ts, shouldCancelAtEnd, user)); } + /** + * Asynchronously add application tokens for renewal. + * + * @param applicationId + * added application + * @param ts + * tokens + * @param shouldCancelAtEnd + * true if tokens should be canceled when the app is done else false. + * @param user + * user + */ + public void addApplicationAsyncDuringRecovery(ApplicationId applicationId, + Credentials ts, boolean shouldCancelAtEnd, String user) { + processDelegationTokenRenewerEvent( + new DelegationTokenRenewerAppRecoverEvent(applicationId, ts, + shouldCancelAtEnd, user)); + } + /** * Synchronously renew delegation tokens. * @param user user @@ -398,7 +417,7 @@ public class DelegationTokenRenewer extends AbstractService { applicationId, ts, shouldCancelAtEnd, user)); } - private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) + private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) throws IOException, InterruptedException { ApplicationId applicationId = evt.getApplicationId(); Credentials ts = evt.getCredentials(); @@ -842,6 +861,10 @@ public class DelegationTokenRenewer extends AbstractService { DelegationTokenRenewerAppSubmitEvent appSubmitEvt = (DelegationTokenRenewerAppSubmitEvent) evt; handleDTRenewerAppSubmitEvent(appSubmitEvt); + } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) { + DelegationTokenRenewerAppRecoverEvent appRecoverEvt = + (DelegationTokenRenewerAppRecoverEvent) evt; + handleDTRenewerAppRecoverEvent(appRecoverEvt); } else if (evt.getType().equals( DelegationTokenRenewerEventType.FINISH_APPLICATION)) { DelegationTokenRenewer.this.handleAppFinishEvent(evt); @@ -876,17 +899,50 @@ public class DelegationTokenRenewer extends AbstractService { } } } - - static class DelegationTokenRenewerAppSubmitEvent extends + + @SuppressWarnings("unchecked") + private void handleDTRenewerAppRecoverEvent( + DelegationTokenRenewerAppRecoverEvent event) { + try { + // Setup tokens for renewal during recovery + DelegationTokenRenewer.this.handleAppSubmitEvent(event); + } catch (Throwable t) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", t); + } + } + + static class DelegationTokenRenewerAppSubmitEvent + extends + AbstractDelegationTokenRenewerAppEvent { + public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user) { + super(appId, credentails, shouldCancelAtEnd, user, + DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + } + } + + static class DelegationTokenRenewerAppRecoverEvent + extends + AbstractDelegationTokenRenewerAppEvent { + public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user) { + super(appId, credentails, shouldCancelAtEnd, user, + DelegationTokenRenewerEventType.RECOVER_APPLICATION); + } + } + + static class AbstractDelegationTokenRenewerAppEvent extends DelegationTokenRenewerEvent { private Credentials credentials; private boolean shouldCancelAtEnd; private String user; - public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, String user) { - super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user, + DelegationTokenRenewerEventType type) { + super(appId, type); this.credentials = credentails; this.shouldCancelAtEnd = shouldCancelAtEnd; this.user = user; @@ -907,6 +963,7 @@ public class DelegationTokenRenewer extends AbstractService { enum DelegationTokenRenewerEventType { VERIFY_AND_START_APPLICATION, + RECOVER_APPLICATION, FINISH_APPLICATION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 531a4a90f13..cd84208ee26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1179,24 +1179,24 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // Need to wait for a while as now token renewal happens on another thread // and is asynchronous in nature. - waitForTokensToBeRenewed(rm2); + waitForTokensToBeRenewed(rm2, tokenSet); // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); } - private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { - int waitCnt = 20; - boolean atleastOneAppInNEWState = true; - while (waitCnt-- > 0 && atleastOneAppInNEWState) { - atleastOneAppInNEWState = false; - for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { - if (rmApp.getState() == RMAppState.NEW) { - Thread.sleep(1000); - atleastOneAppInNEWState = true; - break; - } + private void waitForTokensToBeRenewed(MockRM rm2, + HashSet> tokenSet) throws Exception { + // Max wait time to get the token renewal can be kept as 1sec (100 * 10ms) + int waitCnt = 100; + while (waitCnt-- > 0) { + if (tokenSet.equals(rm2.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens())) { + // Stop waiting as tokens are populated to DelegationTokenRenewer. + break; + } else { + Thread.sleep(10); } } }