From 11af08d67a8f1006c0ff270d7b314d3801d280f3 Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Sun, 14 May 2023 21:38:04 +0800 Subject: [PATCH] YARN-11489. Fix memory leak of DelegationTokenRenewer futures in DelegationTokenRenewerPoolTracker. (#5629). Contributed by Chun Chen. Signed-off-by: He Xiaoqiao --- .../security/DelegationTokenRenewer.java | 84 ++++++++++++------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index 8040ce9771a..be95572f92a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -123,8 +123,8 @@ public class DelegationTokenRenewer extends AbstractService { private long tokenRenewerThreadTimeout; private long tokenRenewerThreadRetryInterval; private int tokenRenewerThreadRetryMaxAttempts; - private final Map> futures = - new ConcurrentHashMap<>(); + private final LinkedBlockingQueue futures = + new LinkedBlockingQueue<>(); private boolean delegationTokenRenewerPoolTrackerFlag = true; // this config is supposedly not used by end-users. @@ -227,7 +227,7 @@ public class DelegationTokenRenewer extends AbstractService { if (isServiceStarted) { Future future = renewerService.submit(new DelegationTokenRenewerRunnable(evt)); - futures.put(evt, future); + futures.add(new DelegationTokenRenewerFuture(evt, future)); } else { pendingEventQueue.add(evt); int qSize = pendingEventQueue.size(); @@ -998,33 +998,35 @@ public class DelegationTokenRenewer extends AbstractService { @Override public void run() { while (true) { - for (Map.Entry> entry : futures - .entrySet()) { - DelegationTokenRenewerEvent evt = entry.getKey(); - Future future = entry.getValue(); - try { - future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - - // Cancel thread and retry the same event in case of timeout - if (future != null && !future.isDone() && !future.isCancelled()) { - future.cancel(true); - futures.remove(evt); - if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { - renewalTimer.schedule( - getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt), - tokenRenewerThreadRetryInterval); - } else { - LOG.info( - "Exhausted max retry attempts {} in token renewer " - + "thread for {}", - tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); - } + DelegationTokenRenewerFuture dtrf; + try { + dtrf = futures.take(); + } catch (InterruptedException e) { + LOG.debug("DelegationTokenRenewer pool tracker interrupted"); + return; + } + DelegationTokenRenewerEvent evt = dtrf.getEvt(); + Future future = dtrf.getFuture(); + try { + future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // Cancel thread and retry the same event in case of timeout. + if (!future.isDone() && !future.isCancelled()) { + future.cancel(true); + if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) { + renewalTimer.schedule( + getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt), + tokenRenewerThreadRetryInterval); + } else { + LOG.info( + "Exhausted max retry attempts {} in token renewer " + + "thread for {}", + tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId()); } - } catch (Exception e) { - LOG.info("Problem in submitting renew tasks in token renewer " - + "thread.", e); } + } catch (Exception e) { + LOG.info("Problem in submitting renew tasks in token renewer " + + "thread.", e); } } } @@ -1192,6 +1194,32 @@ public class DelegationTokenRenewer extends AbstractService { } } + private static class DelegationTokenRenewerFuture { + private DelegationTokenRenewerEvent evt; + private Future future; + DelegationTokenRenewerFuture(DelegationTokenRenewerEvent evt, + Future future) { + this.future = future; + this.evt = evt; + } + + public DelegationTokenRenewerEvent getEvt() { + return evt; + } + + public void setEvt(DelegationTokenRenewerEvent evt) { + this.evt = evt; + } + + public Future getFuture() { + return future; + } + + public void setFuture(Future future) { + this.future = future; + } + } + // only for testing protected ConcurrentMap, DelegationTokenToRenew> getAllTokens() { return allTokens;