From 0696828a090bc06446f75b29c967697f1d6d845b Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Tue, 21 Jan 2020 13:41:01 -0800 Subject: [PATCH] YARN-9768. RM Renew Delegation token thread should timeout and retry. Contributed by Manikandan R. --- .../hadoop/yarn/conf/YarnConfiguration.java | 14 ++ .../src/main/resources/yarn-default.xml | 24 +++ .../security/DelegationTokenRenewer.java | 144 +++++++++++++- .../security/TestDelegationTokenRenewer.java | 177 +++++++++++++++++- 4 files changed, 355 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 06c3fa4c64a..be7cc89f5da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -729,6 +730,19 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES = 12800; + public static final String RM_DT_RENEWER_THREAD_TIMEOUT = + RM_PREFIX + "delegation-token-renewer.thread-timeout"; + public static final long DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + public static final String RM_DT_RENEWER_THREAD_RETRY_INTERVAL = + RM_PREFIX + "delegation-token-renewer.thread-retry-interval"; + public static final long DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL = + TimeUnit.SECONDS.toMillis(60); // 60 Seconds + public static final String RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts"; + public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS = + 10; + public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c96a7e4cfe5..5277be40b09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -957,6 +957,30 @@ 86400000 + + + RM DelegationTokenRenewer thread timeout + + yarn.resourcemanager.delegation-token-renewer.thread-timeout + 60s + + + + + Default maximum number of retries for each RM DelegationTokenRenewer thread + + yarn.resourcemanager.delegation-token-renewer.thread-retry-max-attempts + 10 + + + + + Time interval between each RM DelegationTokenRenewer thread retry attempt + + yarn.resourcemanager.delegation-token-renewer.thread-retry-interval + 60s + + Thread pool size for RMApplicationHistoryWriter. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index d3ed5032363..fd8935debbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -36,10 +37,12 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -115,6 +118,12 @@ public class DelegationTokenRenewer extends AbstractService { private boolean tokenKeepAliveEnabled; private boolean hasProxyUserPrivileges; private long credentialsValidTimeRemaining; + private long tokenRenewerThreadTimeout; + private long tokenRenewerThreadRetryInterval; + private int tokenRenewerThreadRetryMaxAttempts; + private final Map> futures = + new HashMap<>(); + private boolean delegationTokenRenewerPoolTrackerFlag = true; // this config is supposedly not used by end-users. public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING = @@ -140,6 +149,17 @@ public class DelegationTokenRenewer extends AbstractService { this.credentialsValidTimeRemaining = conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING, DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING); + tokenRenewerThreadTimeout = + conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryInterval = conf.getTimeDuration( + YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, + YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL, + TimeUnit.MILLISECONDS); + tokenRenewerThreadRetryMaxAttempts = + conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); setLocalSecretManagerAndServiceAddr(); renewerService = createNewThreadPoolService(conf); pendingEventQueue = new LinkedBlockingQueue(); @@ -184,6 +204,11 @@ public class DelegationTokenRenewer extends AbstractService { serviceStateLock.writeLock().lock(); isServiceStarted = true; serviceStateLock.writeLock().unlock(); + + if (delegationTokenRenewerPoolTrackerFlag) { + renewerService.submit(new DelegationTokenRenewerPoolTracker()); + } + while(!pendingEventQueue.isEmpty()) { processDelegationTokenRenewerEvent(pendingEventQueue.take()); } @@ -195,7 +220,9 @@ public class DelegationTokenRenewer extends AbstractService { serviceStateLock.readLock().lock(); try { if (isServiceStarted) { - renewerService.execute(new DelegationTokenRenewerRunnable(evt)); + Future future = + renewerService.submit(new DelegationTokenRenewerRunnable(evt)); + futures.put(evt, future); } else { pendingEventQueue.add(evt); } @@ -476,7 +503,8 @@ public class DelegationTokenRenewer extends AbstractService { for (Iterator> itor = tokenConf.iterator(); itor.hasNext(); ) { Map.Entry entry = itor.next(); - LOG.info(entry.getKey() + " ===> " + entry.getValue()); + LOG.debug("Token conf key is {} and value is {}", + entry.getKey(), entry.getValue()); } } } else { @@ -894,7 +922,100 @@ public class DelegationTokenRenewer extends AbstractService { public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } - + + @VisibleForTesting + public void setDelegationTokenRenewerPoolTracker(boolean flag) { + delegationTokenRenewerPoolTrackerFlag = flag; + } + + /** + * Create a timer task to retry the token renewer event which would be + * scheduled at defined intervals based on the configuration. + * + * @param evt + * @return Timer Task + */ + private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) { + return new TimerTask() { + @Override + public void run() { + LOG.info("Retrying token renewer thread for appid = {} and " + + "attempt is {}", evt.getApplicationId(), + evt.getAttempt()); + evt.incrAttempt(); + + Collection> tokens = + evt.getCredentials().getAllTokens(); + for (Token token : tokens) { + DelegationTokenToRenew dttr = allTokens.get(token); + if (dttr != null) { + removeFailedDelegationToken(dttr); + } + } + + DelegationTokenRenewerAppRecoverEvent event = + new DelegationTokenRenewerAppRecoverEvent( + evt.getApplicationId(), evt.getCredentials(), + evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf()); + event.setAttempt(evt.getAttempt()); + processDelegationTokenRenewerEvent(event); + } + }; + } + + /** + * Runnable class to set timeout for futures of all threads running in + * renewerService thread pool executor asynchronously. + * + * In case of timeout exception, retries would be attempted with defined + * intervals till no. of retry attempt reaches max attempt. + */ + private final class DelegationTokenRenewerPoolTracker + implements Runnable { + + DelegationTokenRenewerPoolTracker() { + } + + /** + * Keep traversing of renewer pool threads and wait for specific + * timeout. In case of timeout exception, retry the event till no. of + * attempts reaches max attempts with specific interval. + */ + @Override + public void run() { + while (true) { + for (Map.Entry> entry : futures + .entrySet()) { + DelegationTokenRenewerEvent evt = entry.getKey(); + Future future = entry.getValue(); + try { + future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + + // Cancel thread and retry the same event in case of timeout + if (future != null && !future.isDone() && !future.isCancelled()) { + future.cancel(true); + futures.remove(evt); + if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { + renewalTimer.schedule( + getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt), + tokenRenewerThreadRetryInterval); + } else { + LOG.info( + "Exhausted max retry attempts {} in token renewer " + + "thread for {}", + tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); + } + } + } catch (Exception e) { + LOG.info("Problem in submitting renew tasks in token renewer " + + "thread.", e); + } + } + } + } + } + /* * This will run as a separate thread and will process individual events. It * is done in this way to make sure that the token renewal as a part of @@ -1016,6 +1137,10 @@ public class DelegationTokenRenewer extends AbstractService { public String getUser() { return user; } + + private Configuration getTokenConf() { + return tokenConf; + } } enum DelegationTokenRenewerEventType { @@ -1028,6 +1153,7 @@ public class DelegationTokenRenewer extends AbstractService { AbstractEvent { private ApplicationId appId; + private int attempt = 1; public DelegationTokenRenewerEvent(ApplicationId appId, DelegationTokenRenewerEventType type) { @@ -1038,6 +1164,18 @@ public class DelegationTokenRenewer extends AbstractService { public ApplicationId getApplicationId() { return appId; } + + public void incrAttempt() { + attempt++; + } + + public int getAttempt() { + return attempt; + } + + public void setAttempt(int attempt) { + this.attempt = attempt; + } } // only for testing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 5f6d4402967..0205460efa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -42,6 +43,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -93,6 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -230,6 +233,7 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false); delegationTokenRenewer.setRMContext(mockContext); delegationTokenRenewer.init(conf); delegationTokenRenewer.start(); @@ -632,6 +636,7 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -712,6 +717,7 @@ public class TestDelegationTokenRenewer { InetSocketAddress sockAddr = InetSocketAddress.createUnresolved("localhost", 1234); when(mockClientRMService.getBindAddress()).thenReturn(sockAddr); + localDtr.setDelegationTokenRenewerPoolTracker(false); localDtr.setRMContext(mockContext); localDtr.init(lconf); localDtr.start(); @@ -1612,4 +1618,173 @@ public class TestDelegationTokenRenewer { // Ensure incrTokenSequenceNo has been called for token renewal as well. Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo(); } -} + + /** + * Test case to ensure token renewer threads are timed out by inducing + * artificial delay. + * + * Because of time out, retries would be attempted till it reaches max retry + * attempt and finally asserted using used threads count. + * + * @throws Exception + */ + @Test(timeout = 30000) + public void testTokenThreadTimeout() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class, + RMStateStore.class); + yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5, + TimeUnit.SECONDS); + yarnConf.setTimeDuration( + YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5, + TimeUnit.SECONDS); + yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText = new Text("user1"); + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText, + new Text("renewer1"), userText); + final Token originalToken = + new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + + // -1 is because of thread allocated to pool tracker runnable tasks + AtomicInteger threadCounter = new AtomicInteger(-1); + renewDelay.set(true); + DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout( + yarnConf, threadCounter, renewDelay); + + MockRM rm = new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renewer; + } + }; + + rm.start(); + rm.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + int attempts = yarnConf.getInt( + YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS); + + GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000, + 30000); + + // Ensure no. of threads has been used in renewer service thread pool is + // higher than the configured max retry attempts + assertTrue(threadCounter.get() >= attempts); + rm.close(); + } + + /** + * Test case to ensure token renewer threads are running as usual and finally + * asserted only 1 thread has been used. + * + * @throws Exception + */ + @Test(timeout = 30000) + public void testTokenThreadTimeoutWithoutDelay() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, + true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.set(YarnConfiguration.RM_STORE, + MemoryRMStateStore.class.getName()); + yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5, + TimeUnit.SECONDS); + yarnConf.setTimeDuration( + YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5, + TimeUnit.SECONDS); + yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS, + 3); + UserGroupInformation.setConfiguration(yarnConf); + + Text userText = new Text("user1"); + DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText, + new Text("renewer1"), userText); + final Token originalToken = + new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(), + new Text("service1")); + + Credentials credentials = new Credentials(); + credentials.addToken(userText, originalToken); + + AtomicBoolean renewDelay = new AtomicBoolean(false); + + // -1 is because of thread allocated to pool tracker runnable tasks + AtomicInteger threadCounter = new AtomicInteger(-1); + DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout( + yarnConf, threadCounter, renewDelay); + + MockRM rm = new TestSecurityMockRM(yarnConf) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return renwer; + } + }; + + rm.start(); + rm.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + GenericTestUtils.waitFor(() -> threadCounter.get() == 1, 2000, 40000); + + // Ensure only one thread has been used in renewer service thread pool. + assertEquals(threadCounter.get(), 1); + rm.close(); + } + + private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout( + Configuration config, final AtomicInteger renewerCounter, + final AtomicBoolean renewDelay) { + DelegationTokenRenewer renew = new DelegationTokenRenewer() { + @Override + protected ThreadPoolExecutor createNewThreadPoolService( + Configuration configuration) { + ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L, + TimeUnit.SECONDS, new LinkedBlockingQueue()) { + @Override + public Future submit(Runnable r) { + renewerCounter.incrementAndGet(); + return super.submit(r); + } + }; + return pool; + } + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + try { + if (renewDelay.get()) { + // Delay for 4 times than the configured timeout + Thread.sleep(config.getTimeDuration( + YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT, + TimeUnit.MILLISECONDS) * 4); + } + super.renewToken(dttr); + } catch (InterruptedException e) { + LOG.info("Sleep Interrupted", e); + } + } + }; + renew.setDelegationTokenRenewerPoolTracker(true); + return renew; + } +} \ No newline at end of file