From d78b300ed4c04100614542d3c86edfc40f3a9aa1 Mon Sep 17 00:00:00 2001 From: minni31 Date: Thu, 29 Jul 2021 14:55:39 +0530 Subject: [PATCH] YARN-10841. Fix token reset synchronization for UAM response token. (#3194) YARN-10841. Fix token reset synchronization for UAM response token. Contributed by Minni Mittal --- .../amrmproxy/FederationInterceptor.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index e95594ca46c..c32afee7e9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -1413,8 +1413,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (otherRMAddress.equals(this.homeSubClusterId)) { homeResponse.setAMRMToken(otherResponse.getAMRMToken()); } else { - throw new YarnRuntimeException( - "amrmToken from UAM " + otherRMAddress + " should be null here"); + LOG.warn("amrmToken from UAM {} not null, it should be null here", + otherRMAddress); } } @@ -1691,6 +1691,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @Override public void callback(AllocateResponse response) { + org.apache.hadoop.yarn.api.records.Token amrmToken = + response.getAMRMToken(); synchronized (asyncResponseSink) { List responses = null; if (asyncResponseSink.containsKey(subClusterId)) { @@ -1700,6 +1702,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { asyncResponseSink.put(subClusterId, responses); } responses.add(response); + + if (this.isUAM) { + // Do not further propagate the new amrmToken for UAM + response.setAMRMToken(null); + } // Notify main thread about the response arrival asyncResponseSink.notifyAll(); } @@ -1716,9 +1723,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // Save the new AMRMToken for the UAM if present // Do this last because it can be slow... - if (this.isUAM && response.getAMRMToken() != null) { + if (this.isUAM && amrmToken != null) { Token newToken = ConverterUtils - .convertFromYarn(response.getAMRMToken(), (Text) null); + .convertFromYarn(amrmToken, (Text) null); // Do not further propagate the new amrmToken for UAM response.setAMRMToken(null);