From 1ff3fd33ed6f2ac09c774cc42b0107c5dbd9c19d Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 9 Apr 2015 13:08:53 -0700 Subject: [PATCH] YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token renewal of applications part of a bigger workflow. Contributed by Daryn Sharp. (cherry picked from commit 9c5911294e0ba71aefe4763731b0e780cde9d0ca) --- hadoop-yarn-project/CHANGES.txt | 3 + .../security/DelegationTokenRenewer.java | 137 +++++++++++------- .../security/TestDelegationTokenRenewer.java | 87 ++++++++++- 3 files changed, 173 insertions(+), 54 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6a3999f79b0..048557d6b19 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -867,6 +867,9 @@ Release 2.7.0 - UNRELEASED YARN-3466. Fix RM nodes web page to sort by node HTTP-address, #containers and node-label column (Jason Lowe via wangda) + YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token + renewal of applications part of a bigger workflow. (Daryn Sharp via vinodkv) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 261997180c4..d49ecfc5e22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -229,15 +230,16 @@ public class DelegationTokenRenewer extends AbstractService { @VisibleForTesting protected static class DelegationTokenToRenew { public final Token token; - public final ApplicationId applicationId; + public final Collection referringAppIds; public final Configuration conf; public long expirationDate; - public TimerTask timerTask; + public RenewalTimerTask timerTask; public volatile boolean shouldCancelAtEnd; public long maxDate; public String user; - public DelegationTokenToRenew(ApplicationId jId, Token token, + public DelegationTokenToRenew(Collection applicationIds, + Token token, Configuration conf, long expirationDate, boolean shouldCancelAtEnd, String user) { this.token = token; @@ -251,20 +253,33 @@ public class DelegationTokenRenewer extends AbstractService { throw new YarnRuntimeException(e); } } - this.applicationId = jId; + this.referringAppIds = Collections.synchronizedSet( + new HashSet(applicationIds)); this.conf = conf; this.expirationDate = expirationDate; this.timerTask = null; this.shouldCancelAtEnd = shouldCancelAtEnd; } - public void setTimerTask(TimerTask tTask) { + public void setTimerTask(RenewalTimerTask tTask) { timerTask = tTask; } - + + @VisibleForTesting + public void cancelTimer() { + if (timerTask != null) { + timerTask.cancel(); + } + } + + @VisibleForTesting + public boolean isTimerCancelled() { + return (timerTask != null) && timerTask.cancelled.get(); + } + @Override public String toString() { - return token + ";exp=" + expirationDate; + return token + ";exp=" + expirationDate + "; apps=" + referringAppIds; } @Override @@ -415,19 +430,16 @@ public class DelegationTokenRenewer extends AbstractService { } DelegationTokenToRenew dttr = allTokens.get(token); - if (dttr != null) { - // If any of the jobs sharing the same token doesn't want to cancel - // the token, we should not cancel the token. - if (!evt.shouldCancelAtEnd) { - dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd; - LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd - + " for token " + dttr.token); + if (dttr == null) { + dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token, + getConfig(), now, shouldCancelAtEnd, evt.getUser()); + try { + renewToken(dttr); + } catch (IOException ioe) { + throw new IOException("Failed to renew token: " + dttr.token, ioe); } - continue; } - - tokenList.add(new DelegationTokenToRenew(applicationId, token, - getConfig(), now, shouldCancelAtEnd, evt.getUser())); + tokenList.add(dttr); } } @@ -436,21 +448,21 @@ public class DelegationTokenRenewer extends AbstractService { // If user provides incorrect token then it should not be added for // renewal. for (DelegationTokenToRenew dtr : tokenList) { - try { - renewToken(dtr); - } catch (IOException ioe) { - throw new IOException("Failed to renew token: " + dtr.token, ioe); + DelegationTokenToRenew currentDtr = + allTokens.putIfAbsent(dtr.token, dtr); + if (currentDtr != null) { + // another job beat us + currentDtr.referringAppIds.add(applicationId); + appTokens.get(applicationId).add(currentDtr); + } else { + appTokens.get(applicationId).add(dtr); + setTimerForTokenRenewal(dtr); } } - for (DelegationTokenToRenew dtr : tokenList) { - appTokens.get(applicationId).add(dtr); - allTokens.put(dtr.token, dtr); - setTimerForTokenRenewal(dtr); - } } if (!hasHdfsToken) { - requestNewHdfsDelegationToken(applicationId, evt.getUser(), + requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(), shouldCancelAtEnd); } } @@ -478,7 +490,7 @@ public class DelegationTokenRenewer extends AbstractService { try { requestNewHdfsDelegationTokenIfNeeded(dttr); // if the token is not replaced by a new token, renew the token - if (appTokens.get(dttr.applicationId).contains(dttr)) { + if (!dttr.isTimerCancelled()) { renewToken(dttr); setTimerForTokenRenewal(dttr);// set the next one } else { @@ -508,12 +520,12 @@ public class DelegationTokenRenewer extends AbstractService { long expiresIn = token.expirationDate - System.currentTimeMillis(); long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration // need to create new task every time - TimerTask tTask = new RenewalTimerTask(token); + RenewalTimerTask tTask = new RenewalTimerTask(token); token.setTimerTask(tTask); // keep reference to the timer renewalTimer.schedule(token.timerTask, new Date(renewIn)); LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = " - + token.applicationId); + + token.referringAppIds); } // renew a token @@ -535,7 +547,7 @@ public class DelegationTokenRenewer extends AbstractService { throw new IOException(e); } LOG.info("Renewed delegation-token= [" + dttr + "], for " - + dttr.applicationId); + + dttr.referringAppIds); } // Request new hdfs token if the token is about to expire, and remove the old @@ -548,30 +560,37 @@ public class DelegationTokenRenewer extends AbstractService { && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { + final Collection applicationIds; + synchronized (dttr.referringAppIds) { + applicationIds = new HashSet<>(dttr.referringAppIds); + dttr.referringAppIds.clear(); + } // remove all old expiring hdfs tokens for this application. - Set tokenSet = appTokens.get(dttr.applicationId); - if (tokenSet != null && !tokenSet.isEmpty()) { + for (ApplicationId appId : applicationIds) { + Set tokenSet = appTokens.get(appId); + if (tokenSet == null || tokenSet.isEmpty()) { + continue; + } Iterator iter = tokenSet.iterator(); synchronized (tokenSet) { while (iter.hasNext()) { DelegationTokenToRenew t = iter.next(); if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { iter.remove(); - if (t.timerTask != null) { - t.timerTask.cancel(); - } + t.cancelTimer(); LOG.info("Removed expiring token " + t); } } } } LOG.info("Token= (" + dttr + ") is expiring, request new token."); - requestNewHdfsDelegationToken(dttr.applicationId, dttr.user, - dttr.shouldCancelAtEnd); + requestNewHdfsDelegationToken(applicationIds, dttr.user, + dttr.shouldCancelAtEnd); } } - private void requestNewHdfsDelegationToken(ApplicationId applicationId, + private void requestNewHdfsDelegationToken( + Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { if (!hasProxyUserPrivileges) { @@ -583,18 +602,20 @@ public class DelegationTokenRenewer extends AbstractService { Token[] newTokens = obtainSystemTokensForUser(user, credentials); // Add new tokens to the toRenew list. - LOG.info("Received new tokens for " + applicationId + ". Received " + LOG.info("Received new tokens for " + referringAppIds + ". Received " + newTokens.length + " tokens."); if (newTokens.length > 0) { for (Token token : newTokens) { if (token.isManaged()) { DelegationTokenToRenew tokenToRenew = - new DelegationTokenToRenew(applicationId, token, getConfig(), + new DelegationTokenToRenew(referringAppIds, token, getConfig(), Time.now(), shouldCancelAtEnd, user); // renew the token to get the next expiration date. renewToken(tokenToRenew); setTimerForTokenRenewal(tokenToRenew); - appTokens.get(applicationId).add(tokenToRenew); + for (ApplicationId applicationId : referringAppIds) { + appTokens.get(applicationId).add(tokenToRenew); + } LOG.info("Received new token " + token); } } @@ -602,7 +623,9 @@ public class DelegationTokenRenewer extends AbstractService { DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); + for (ApplicationId applicationId : referringAppIds) { + rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer); + } } @VisibleForTesting @@ -644,16 +667,18 @@ public class DelegationTokenRenewer extends AbstractService { * removing failed DT */ private void removeFailedDelegationToken(DelegationTokenToRenew t) { - ApplicationId applicationId = t.applicationId; - LOG.error("removing failed delegation token for appid=" + applicationId - + ";t=" + t.token.getService()); - appTokens.get(applicationId).remove(t); + Collection applicationIds = t.referringAppIds; + synchronized (applicationIds) { + LOG.error("removing failed delegation token for appid=" + applicationIds + + ";t=" + t.token.getService()); + for (ApplicationId applicationId : applicationIds) { + appTokens.get(applicationId).remove(t); + } + } allTokens.remove(t.token); // cancel the timer - if (t.timerTask != null) { - t.timerTask.cancel(); - } + t.cancelTimer(); } /** @@ -706,9 +731,15 @@ public class DelegationTokenRenewer extends AbstractService { + "; token=" + dttr.token.getService()); } + // continue if the app list isn't empty + synchronized(dttr.referringAppIds) { + dttr.referringAppIds.remove(applicationId); + if (!dttr.referringAppIds.isEmpty()) { + continue; + } + } // cancel the timer - if (dttr.timerTask != null) - dttr.timerTask.cancel(); + dttr.cancelTimer(); // cancel the token cancelToken(dttr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 99a506afaaa..bc9c2951079 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Assert; @@ -123,7 +124,7 @@ public class TestDelegationTokenRenewer { counter = 0; lastRenewed = null; tokenToRenewIn2Sec = null; - + cancelled = false; } @Override @@ -1046,4 +1047,88 @@ public class TestDelegationTokenRenewer { delegationTokenRenewer.obtainSystemTokensForUser(user, credentials); Assert.assertEquals(oldCounter, MyFS.getInstanceCounter()); } + + // Test submitting an application with the token obtained by a previously + // submitted application that is set to be cancelled. Token should be + // renewed while all apps are running, and then cancelled when all apps + // complete + @Test (timeout = 30000) + public void testCancelWithMultipleAppSubmissions() throws Exception{ + MockRM rm = new TestSecurityMockRM(conf, null); + rm.start(); + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + + //MyFS fs = (MyFS)FileSystem.get(conf); + //MyToken token1 = fs.getDelegationToken("user123"); + + // create Token1: + Text userText1 = new Text("user"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(token1.getService(), token1); + + DelegationTokenRenewer renewer = + rm.getRMContext().getDelegationTokenRenewer(); + Assert.assertTrue(renewer.getAllTokens().isEmpty()); + Assert.assertFalse(Renewer.cancelled); + + RMApp app1 = + rm.submitApp(200, "name", "user", null, false, null, 2, credentials, + null, true, false, false, null, 0, null, true); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + DelegationTokenToRenew dttr = renewer.getAllTokens().get(token1); + Assert.assertNotNull(dttr); + Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId())); + RMApp app2 = + rm.submitApp(200, "name", "user", null, false, null, 2, credentials, + null, true, false, false, null, 0, null, true); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING); + Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); + Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId())); + Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId())); + Assert.assertFalse(Renewer.cancelled); + + MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2); + // app2 completes, app1 is still running, check the token is not cancelled + Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); + Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId())); + Assert.assertFalse(dttr.referringAppIds.contains(app2.getApplicationId())); + Assert.assertFalse(dttr.isTimerCancelled()); + Assert.assertFalse(Renewer.cancelled); + + RMApp app3 = + rm.submitApp(200, "name", "user", null, false, null, 2, credentials, + null, true, false, false, null, 0, null, true); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1); + rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING); + Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); + Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId())); + Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId())); + Assert.assertFalse(dttr.isTimerCancelled()); + Assert.assertFalse(Renewer.cancelled); + + MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1); + Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); + Assert.assertFalse(dttr.referringAppIds.contains(app1.getApplicationId())); + Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId())); + Assert.assertFalse(dttr.isTimerCancelled()); + Assert.assertFalse(Renewer.cancelled); + + MockRM.finishAMAndVerifyAppState(app3, rm, nm1, am3); + Assert.assertFalse(renewer.getAllTokens().containsKey(token1)); + Assert.assertTrue(dttr.referringAppIds.isEmpty()); + Assert.assertTrue(dttr.isTimerCancelled()); + Assert.assertTrue(Renewer.cancelled); + } }