diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 251e6afa789..3605c5217b9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.7.2 - UNRELEASED YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart. (Varun Saxena via jianhe) + YARN-4041. Slow delegation token renewal can severely prolong RM recovery + (Sunil G via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f1ebba922e7..a6fc58b5ec9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -854,14 +854,16 @@ public class RMAppImpl implements RMApp, Recoverable { } if (UserGroupInformation.isSecurityEnabled()) { - // synchronously renew delegation token on recovery. + // asynchronously renew delegation token on recovery. try { - app.rmContext.getDelegationTokenRenewer().addApplicationSync( - app.getApplicationId(), app.parseCredentials(), - app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); + app.rmContext.getDelegationTokenRenewer() + .addApplicationAsyncDuringRecovery(app.getApplicationId(), + app.parseCredentials(), + app.submissionContext.getCancelTokensWhenComplete(), + app.getUser()); } catch (Exception e) { - String msg = "Failed to renew token for " + app.applicationId - + " on recovery : " + e.getMessage(); + String msg = "Failed to fetch user credentials from application:" + + e.getMessage(); app.diagnostics.append(msg); LOG.error(msg, e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index c12fb81c948..90627d10ee6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -385,6 +385,25 @@ public class DelegationTokenRenewer extends AbstractService { applicationId, ts, shouldCancelAtEnd, user)); } + /** + * Asynchronously add application tokens for renewal. + * + * @param applicationId + * added application + * @param ts + * tokens + * @param shouldCancelAtEnd + * true if tokens should be canceled when the app is done else false. + * @param user + * user + */ + public void addApplicationAsyncDuringRecovery(ApplicationId applicationId, + Credentials ts, boolean shouldCancelAtEnd, String user) { + processDelegationTokenRenewerEvent( + new DelegationTokenRenewerAppRecoverEvent(applicationId, ts, + shouldCancelAtEnd, user)); + } + /** * Synchronously renew delegation tokens. * @param user user @@ -396,7 +415,7 @@ public class DelegationTokenRenewer extends AbstractService { applicationId, ts, shouldCancelAtEnd, user)); } - private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) + private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt) throws IOException, InterruptedException { ApplicationId applicationId = evt.getApplicationId(); Credentials ts = evt.getCredentials(); @@ -825,6 +844,10 @@ public class DelegationTokenRenewer extends AbstractService { DelegationTokenRenewerAppSubmitEvent appSubmitEvt = (DelegationTokenRenewerAppSubmitEvent) evt; handleDTRenewerAppSubmitEvent(appSubmitEvt); + } else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) { + DelegationTokenRenewerAppRecoverEvent appRecoverEvt = + (DelegationTokenRenewerAppRecoverEvent) evt; + handleDTRenewerAppRecoverEvent(appRecoverEvt); } else if (evt.getType().equals( DelegationTokenRenewerEventType.FINISH_APPLICATION)) { DelegationTokenRenewer.this.handleAppFinishEvent(evt); @@ -859,17 +882,50 @@ public class DelegationTokenRenewer extends AbstractService { } } } - - static class DelegationTokenRenewerAppSubmitEvent extends + + @SuppressWarnings("unchecked") + private void handleDTRenewerAppRecoverEvent( + DelegationTokenRenewerAppRecoverEvent event) { + try { + // Setup tokens for renewal during recovery + DelegationTokenRenewer.this.handleAppSubmitEvent(event); + } catch (Throwable t) { + LOG.warn( + "Unable to add the application to the delegation token renewer.", t); + } + } + + static class DelegationTokenRenewerAppSubmitEvent + extends + AbstractDelegationTokenRenewerAppEvent { + public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user) { + super(appId, credentails, shouldCancelAtEnd, user, + DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + } + } + + static class DelegationTokenRenewerAppRecoverEvent + extends + AbstractDelegationTokenRenewerAppEvent { + public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user) { + super(appId, credentails, shouldCancelAtEnd, user, + DelegationTokenRenewerEventType.RECOVER_APPLICATION); + } + } + + static class AbstractDelegationTokenRenewerAppEvent extends DelegationTokenRenewerEvent { private Credentials credentials; private boolean shouldCancelAtEnd; private String user; - public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, - Credentials credentails, boolean shouldCancelAtEnd, String user) { - super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); + public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId, + Credentials credentails, boolean shouldCancelAtEnd, String user, + DelegationTokenRenewerEventType type) { + super(appId, type); this.credentials = credentails; this.shouldCancelAtEnd = shouldCancelAtEnd; this.user = user; @@ -890,6 +946,7 @@ public class DelegationTokenRenewer extends AbstractService { enum DelegationTokenRenewerEventType { VERIFY_AND_START_APPLICATION, + RECOVER_APPLICATION, FINISH_APPLICATION } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index a0b67f6be95..d057498fe00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1166,24 +1166,24 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { // Need to wait for a while as now token renewal happens on another thread // and is asynchronous in nature. - waitForTokensToBeRenewed(rm2); + waitForTokensToBeRenewed(rm2, tokenSet); // verify tokens are properly populated back to rm2 DelegationTokenRenewer Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); } - private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { - int waitCnt = 20; - boolean atleastOneAppInNEWState = true; - while (waitCnt-- > 0 && atleastOneAppInNEWState) { - atleastOneAppInNEWState = false; - for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { - if (rmApp.getState() == RMAppState.NEW) { - Thread.sleep(1000); - atleastOneAppInNEWState = true; - break; - } + private void waitForTokensToBeRenewed(MockRM rm2, + HashSet> tokenSet) throws Exception { + // Max wait time to get the token renewal can be kept as 1sec (100 * 10ms) + int waitCnt = 100; + while (waitCnt-- > 0) { + if (tokenSet.equals(rm2.getRMContext().getDelegationTokenRenewer() + .getDelegationTokens())) { + // Stop waiting as tokens are populated to DelegationTokenRenewer. + break; + } else { + Thread.sleep(10); } } }