From 173664d70f0ed3b1852b6703d32e796778fb1c78 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 18 Dec 2014 23:28:18 +0000 Subject: [PATCH] YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie). Contributed by Jian He (cherry picked from commit 0402bada1989258ecbfdc437cb339322a1f55a97) --- hadoop-yarn-project/CHANGES.txt | 3 + .../security/DelegationTokenRenewer.java | 42 +++++++++++-- .../yarn/server/resourcemanager/MockRM.java | 13 ++-- .../security/TestDelegationTokenRenewer.java | 62 +++++++++++++++++-- 4 files changed, 103 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 47477e2ba0e..07e14f52af9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -212,6 +212,9 @@ Release 2.7.0 - UNRELEASED YARN-2944. InMemorySCMStore can not be instantiated with ReflectionUtils#newInstance. (Chris Trezzo via kasha) + YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie) + (Jian He via jlowe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index f9be1bd9049..dfcceb86122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * Service to renew application delegation tokens. */ @@ -94,6 +93,9 @@ public class DelegationTokenRenewer extends AbstractService { private ConcurrentMap> appTokens = new ConcurrentHashMap>(); + private ConcurrentMap, DelegationTokenToRenew> allTokens = + new ConcurrentHashMap, DelegationTokenToRenew>(); + private final ConcurrentMap delayedRemovalMap = new ConcurrentHashMap(); @@ -202,6 +204,7 @@ public class DelegationTokenRenewer extends AbstractService { renewalTimer.cancel(); } appTokens.clear(); + allTokens.clear(); this.renewerService.shutdown(); dtCancelThread.interrupt(); try { @@ -230,7 +233,7 @@ public class DelegationTokenRenewer extends AbstractService { public final Configuration conf; public long expirationDate; public TimerTask timerTask; - public final boolean shouldCancelAtEnd; + public volatile boolean shouldCancelAtEnd; public long maxDate; public String user; @@ -407,12 +410,25 @@ public class DelegationTokenRenewer extends AbstractService { boolean hasHdfsToken = false; for (Token token : tokens) { if (token.isManaged()) { - tokenList.add(new DelegationTokenToRenew(applicationId, - token, getConfig(), now, shouldCancelAtEnd, evt.getUser())); if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) { LOG.info(applicationId + " found existing hdfs token " + token); hasHdfsToken = true; } + + DelegationTokenToRenew dttr = allTokens.get(token); + if (dttr != null) { + // If any of the jobs sharing the same token doesn't want to cancel + // the token, we should not cancel the token. + if (!evt.shouldCancelAtEnd) { + dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd; + LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd + + " for token " + dttr.token); + } + continue; + } + + tokenList.add(new DelegationTokenToRenew(applicationId, token, + getConfig(), now, shouldCancelAtEnd, evt.getUser())); } } @@ -429,6 +445,7 @@ public class DelegationTokenRenewer extends AbstractService { } for (DelegationTokenToRenew dtr : tokenList) { appTokens.get(applicationId).add(dtr); + allTokens.put(dtr.token, dtr); setTimerForTokenRenewal(dtr); } } @@ -496,7 +513,6 @@ public class DelegationTokenRenewer extends AbstractService { token.setTimerTask(tTask); // keep reference to the timer renewalTimer.schedule(token.timerTask, new Date(renewIn)); - LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = " + token.applicationId); } @@ -559,6 +575,10 @@ public class DelegationTokenRenewer extends AbstractService { private void requestNewHdfsDelegationToken(ApplicationId applicationId, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { + if (!hasProxyUserPrivileges) { + LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens."); + return; + } // Get new hdfs tokens for this user Credentials credentials = new Credentials(); Token[] newTokens = obtainSystemTokensForUser(user, credentials); @@ -621,6 +641,8 @@ public class DelegationTokenRenewer extends AbstractService { LOG.error("removing failed delegation token for appid=" + applicationId + ";t=" + t.token.getService()); appTokens.get(applicationId).remove(t); + allTokens.remove(t.token); + // cancel the timer if (t.timerTask != null) { t.timerTask.cancel(); @@ -685,9 +707,14 @@ public class DelegationTokenRenewer extends AbstractService { cancelToken(dttr); it.remove(); + allTokens.remove(dttr.token); } } } + + if(tokens != null && tokens.isEmpty()) { + appTokens.remove(applicationId); + } } /** @@ -842,4 +869,9 @@ public class DelegationTokenRenewer extends AbstractService { return appId; } } + + // only for testing + protected ConcurrentMap, DelegationTokenToRenew> getAllTokens() { + return allTokens; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9d0ac2739bc..5794b435cd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -313,7 +313,7 @@ public class MockRM extends ResourceManager { boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null, 0, null); + false, null, 0, null, true); } public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) @@ -322,7 +322,7 @@ public class MockRM extends ResourceManager { .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, attemptFailuresValidityInterval, null); + false, null, attemptFailuresValidityInterval, null, true); } public RMApp submitApp(int masterMemory, String name, String user, @@ -332,26 +332,24 @@ public class MockRM extends ResourceManager { ApplicationId applicationId) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - isAppIdProvided, applicationId, 0, null); + isAppIdProvided, applicationId, 0, null, true); } - @SuppressWarnings("deprecation") public RMApp submitApp(int masterMemory, LogAggregationContext logAggregationContext) throws Exception { return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, logAggregationContext); + false, null, 0, logAggregationContext, true); } - @SuppressWarnings("deprecation") public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, - LogAggregationContext logAggregationContext) + LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -392,6 +390,7 @@ public class MockRM extends ResourceManager { if (logAggregationContext != null) { sub.setLogAggregationContext(logAggregationContext); } + sub.setCancelTokensWhenComplete(cancelTokensWhenComplete); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 7275089c13b..5d31404b4cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; @@ -116,11 +118,12 @@ public class TestDelegationTokenRenewer { private static int counter = 0; private static Token lastRenewed = null; private static Token tokenToRenewIn2Sec = null; - + private static boolean cancelled = false; private static void reset() { counter = 0; lastRenewed = null; tokenToRenewIn2Sec = null; + } @Override @@ -136,7 +139,8 @@ public class TestDelegationTokenRenewer { @Override public long renew(Token t, Configuration conf) throws IOException { if ( !(t instanceof MyToken)) { - return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT; + // renew in 3 seconds + return System.currentTimeMillis() + 3000; } MyToken token = (MyToken)t; if(token.isCanceled()) { @@ -158,9 +162,12 @@ public class TestDelegationTokenRenewer { @Override public void cancel(Token t, Configuration conf) { - MyToken token = (MyToken)t; - LOG.info("Cancel token " + token); - token.cancelToken(); + cancelled = true; + if (t instanceof MyToken) { + MyToken token = (MyToken) t; + LOG.info("Cancel token " + token); + token.cancelToken(); + } } } @@ -921,6 +928,7 @@ public class TestDelegationTokenRenewer { // YARN will get the token for the app submitted without the delegation token. @Test public void testAppSubmissionWithoutDelegationToken() throws Exception { + conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); // create token2 Text userText2 = new Text("user2"); DelegationTokenIdentifier dtId2 = @@ -970,4 +978,48 @@ public class TestDelegationTokenRenewer { appCredentials.readTokenStorageStream(buf); Assert.assertTrue(appCredentials.getAllTokens().contains(token2)); } + + // Test submitting an application with the token obtained by a previously + // submitted application. + @Test (timeout = 30000) + public void testAppSubmissionWithPreviousToken() throws Exception{ + MockRM rm = new TestSecurityMockRM(conf, null); + rm.start(); + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + + // create Token1: + Text userText1 = new Text("user"); + DelegationTokenIdentifier dtId1 = + new DelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + final Token token1 = + new Token(dtId1.getBytes(), + "password1".getBytes(), dtId1.getKind(), new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText1, token1); + + // submit app1 with a token, set cancelTokenWhenComplete to false; + RMApp app1 = + rm.submitApp(200, "name", "user", null, false, null, 2, credentials, + null, true, false, false, null, 0, null, false); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // submit app2 with the same token, set cancelTokenWhenComplete to true; + RMApp app2 = + rm.submitApp(200, "name", "user", null, false, null, 2, credentials, + null, true, false, false, null, 0, null, true); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING); + MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2); + Assert.assertTrue(rm.getRMContext().getDelegationTokenRenewer() + .getAllTokens().containsKey(token1)); + + MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1); + // app2 completes, app1 is still running, check the token is not cancelled + Assert.assertFalse(Renewer.cancelled); + } }