From f1525825623a1307b5aa55c456b6afa3e0c61135 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Mon, 27 Aug 2018 10:32:22 -0700 Subject: [PATCH] YARN-8705. Refactor the UAM heartbeat thread in preparation for YARN-8696. Contributed by Botong Huang. --- .../server/AMHeartbeatRequestHandler.java | 227 ++++++++++++++++ .../uam/UnmanagedApplicationManager.java | 170 ++---------- .../amrmproxy/FederationInterceptor.java | 245 +++++++++--------- 3 files changed, 358 insertions(+), 284 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java new file mode 100644 index 00000000000..42227bb61d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java @@ -0,0 +1,227 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Extends Thread and provides an implementation that is used for processing the + * AM heart beat request asynchronously and sending back the response using the + * callback method registered with the system. + */ +public class AMHeartbeatRequestHandler extends Thread { + public static final Logger LOG = + LoggerFactory.getLogger(AMHeartbeatRequestHandler.class); + + // Indication flag for the thread to keep running + private volatile boolean keepRunning; + + private Configuration conf; + private ApplicationId applicationId; + + private BlockingQueue requestQueue; + private AMRMClientRelayer rmProxyRelayer; + private UserGroupInformation userUgi; + private int lastResponseId; + + public AMHeartbeatRequestHandler(Configuration conf, + ApplicationId applicationId) { + super("AMHeartbeatRequestHandler Heartbeat Handler Thread"); + this.setUncaughtExceptionHandler( + new HeartBeatThreadUncaughtExceptionHandler()); + this.keepRunning = true; + + this.conf = conf; + this.applicationId = applicationId; + this.requestQueue = new LinkedBlockingQueue<>(); + + resetLastResponseId(); + } + + /** + * Shutdown the thread. + */ + public void shutdown() { + this.keepRunning = false; + this.interrupt(); + } + + @Override + public void run() { + while (keepRunning) { + AsyncAllocateRequestInfo requestInfo; + try { + requestInfo = requestQueue.take(); + if (requestInfo == null) { + throw new YarnException( + "Null requestInfo taken from request queue"); + } + if (!keepRunning) { + break; + } + + // change the response id before forwarding the allocate request as we + // could have different values for each UAM + AllocateRequest request = requestInfo.getRequest(); + if (request == null) { + throw new YarnException("Null allocateRequest from requestInfo"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" + + ((request.getAskList() == null) ? " empty" + : request.getAskList().size())); + } + + request.setResponseId(lastResponseId); + AllocateResponse response = rmProxyRelayer.allocate(request); + if (response == null) { + throw new YarnException("Null allocateResponse from allocate"); + } + + lastResponseId = response.getResponseId(); + // update token if RM has reissued/renewed + if (response.getAMRMToken() != null) { + LOG.debug("Received new AMRMToken"); + YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(), + userUgi, conf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Received Heartbeat reply from RM. Allocated Containers:" + + ((response.getAllocatedContainers() == null) ? " empty" + : response.getAllocatedContainers().size())); + } + + if (requestInfo.getCallback() == null) { + throw new YarnException("Null callback from requestInfo"); + } + requestInfo.getCallback().callback(response); + } catch (InterruptedException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting for queue", ex); + } + } catch (Throwable ex) { + LOG.warn( + "Error occurred while processing heart beat for " + applicationId, + ex); + } + } + + LOG.info("AMHeartbeatRequestHandler thread for {} is exiting", + applicationId); + } + + /** + * Reset the lastResponseId to zero. + */ + public void resetLastResponseId() { + this.lastResponseId = 0; + } + + /** + * Set the AMRMClientRelayer for RM connection. + */ + public void setAMRMClientRelayer(AMRMClientRelayer relayer) { + this.rmProxyRelayer = relayer; + } + + /** + * Set the UGI for RM connection. + */ + public void setUGI(UserGroupInformation ugi) { + this.userUgi = ugi; + } + + /** + * Sends the specified heart beat request to the resource manager and invokes + * the callback asynchronously with the response. + * + * @param request the allocate request + * @param callback the callback method for the request + * @throws YarnException if registerAM is not called yet + */ + public void allocateAsync(AllocateRequest request, + AsyncCallback callback) throws YarnException { + try { + this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback)); + } catch (InterruptedException ex) { + // Should not happen as we have MAX_INT queue length + LOG.debug("Interrupted while waiting to put on response queue", ex); + } + } + + @VisibleForTesting + public int getRequestQueueSize() { + return this.requestQueue.size(); + } + + /** + * Data structure that encapsulates AllocateRequest and AsyncCallback + * instance. + */ + public static class AsyncAllocateRequestInfo { + private AllocateRequest request; + private AsyncCallback callback; + + public AsyncAllocateRequestInfo(AllocateRequest request, + AsyncCallback callback) { + Preconditions.checkArgument(request != null, + "AllocateRequest cannot be null"); + Preconditions.checkArgument(callback != null, "Callback cannot be null"); + + this.request = request; + this.callback = callback; + } + + public AsyncCallback getCallback() { + return this.callback; + } + + public AllocateRequest getRequest() { + return this.request; + } + } + + /** + * Uncaught exception handler for the background heartbeat thread. + */ + public class HeartBeatThreadUncaughtExceptionHandler + implements UncaughtExceptionHandler { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Heartbeat thread {} for application {} crashed!", t.getName(), + applicationId, e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index abdec197bbb..78dcfb672b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -19,11 +19,8 @@ package org.apache.hadoop.yarn.server.uam; import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; import java.util.EnumSet; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -63,9 +60,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; @@ -89,8 +86,7 @@ public class UnmanagedApplicationManager { public static final String APP_NAME = "UnmanagedAM"; private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name"; - private BlockingQueue requestQueue; - private AMRequestHandlerThread handlerThread; + private AMHeartbeatRequestHandler heartbeatHandler; private AMRMClientRelayer rmProxyRelayer; private ApplicationId applicationId; private String submitter; @@ -99,7 +95,6 @@ public class UnmanagedApplicationManager { private String queueName; private UserGroupInformation userUgi; private RegisterApplicationMasterRequest registerRequest; - private int lastResponseId; private ApplicationClientProtocol rmClient; private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; @@ -137,8 +132,8 @@ public class UnmanagedApplicationManager { this.queueName = queueName; this.submitter = submitter; this.appNameSuffix = appNameSuffix; - this.handlerThread = new AMRequestHandlerThread(); - this.requestQueue = new LinkedBlockingQueue<>(); + this.heartbeatHandler = + new AMHeartbeatRequestHandler(this.conf, this.applicationId); this.rmProxyRelayer = null; this.connectionInitiated = false; this.registerRequest = null; @@ -194,6 +189,9 @@ public class UnmanagedApplicationManager { this.rmProxyRelayer = new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken), this.applicationId); + + this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer); + this.heartbeatHandler.setUGI(this.userUgi); } /** @@ -215,7 +213,7 @@ public class UnmanagedApplicationManager { this.applicationId); RegisterApplicationMasterResponse response = this.rmProxyRelayer.registerApplicationMaster(this.registerRequest); - this.lastResponseId = 0; + this.heartbeatHandler.resetLastResponseId(); for (Container container : response.getContainersFromPreviousAttempts()) { LOG.debug("RegisterUAM returned existing running container " @@ -227,12 +225,9 @@ public class UnmanagedApplicationManager { } // Only when register succeed that we start the heartbeat thread - this.handlerThread.setUncaughtExceptionHandler( - new HeartBeatThreadUncaughtExceptionHandler()); - this.handlerThread.setDaemon(true); - this.handlerThread.start(); + this.heartbeatHandler.setDaemon(true); + this.heartbeatHandler.start(); - this.lastResponseId = 0; return response; } @@ -248,7 +243,7 @@ public class UnmanagedApplicationManager { FinishApplicationMasterRequest request) throws YarnException, IOException { - this.handlerThread.shutdown(); + this.heartbeatHandler.shutdown(); if (this.rmProxyRelayer == null) { if (this.connectionInitiated) { @@ -277,7 +272,7 @@ public class UnmanagedApplicationManager { KillApplicationRequest request = KillApplicationRequest.newInstance(this.applicationId); - this.handlerThread.shutdown(); + this.heartbeatHandler.shutdown(); if (this.rmClient == null) { this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, @@ -296,12 +291,8 @@ public class UnmanagedApplicationManager { */ public void allocateAsync(AllocateRequest request, AsyncCallback callback) throws YarnException { - try { - this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback)); - } catch (InterruptedException ex) { - // Should not happen as we have MAX_INT queue length - LOG.debug("Interrupted while waiting to put on response queue", ex); - } + this.heartbeatHandler.allocateAsync(request, callback); + // Two possible cases why the UAM is not successfully registered yet: // 1. launchUAM is not called at all. Should throw here. // 2. launchUAM is called but hasn't successfully returned. @@ -519,139 +510,8 @@ public class UnmanagedApplicationManager { return this.rmClient.getApplicationReport(request).getApplicationReport(); } - /** - * Data structure that encapsulates AllocateRequest and AsyncCallback - * instance. - */ - public static class AsyncAllocateRequestInfo { - private AllocateRequest request; - private AsyncCallback callback; - - public AsyncAllocateRequestInfo(AllocateRequest request, - AsyncCallback callback) { - Preconditions.checkArgument(request != null, - "AllocateRequest cannot be null"); - Preconditions.checkArgument(callback != null, "Callback cannot be null"); - - this.request = request; - this.callback = callback; - } - - public AsyncCallback getCallback() { - return this.callback; - } - - public AllocateRequest getRequest() { - return this.request; - } - } - @VisibleForTesting public int getRequestQueueSize() { - return this.requestQueue.size(); - } - - /** - * Extends Thread and provides an implementation that is used for processing - * the AM heart beat request asynchronously and sending back the response - * using the callback method registered with the system. - */ - public class AMRequestHandlerThread extends Thread { - - // Indication flag for the thread to keep running - private volatile boolean keepRunning; - - public AMRequestHandlerThread() { - super("UnmanagedApplicationManager Heartbeat Handler Thread"); - this.keepRunning = true; - } - - /** - * Shutdown the thread. - */ - public void shutdown() { - this.keepRunning = false; - this.interrupt(); - } - - @Override - public void run() { - while (keepRunning) { - AsyncAllocateRequestInfo requestInfo; - try { - requestInfo = requestQueue.take(); - if (requestInfo == null) { - throw new YarnException( - "Null requestInfo taken from request queue"); - } - if (!keepRunning) { - break; - } - - // change the response id before forwarding the allocate request as we - // could have different values for each UAM - AllocateRequest request = requestInfo.getRequest(); - if (request == null) { - throw new YarnException("Null allocateRequest from requestInfo"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" - + ((request.getAskList() == null) ? " empty" - : request.getAskList().size())); - } - - request.setResponseId(lastResponseId); - AllocateResponse response = rmProxyRelayer.allocate(request); - if (response == null) { - throw new YarnException("Null allocateResponse from allocate"); - } - - lastResponseId = response.getResponseId(); - // update token if RM has reissued/renewed - if (response.getAMRMToken() != null) { - LOG.debug("Received new AMRMToken"); - YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(), - userUgi, conf); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Received Heartbeat reply from RM. Allocated Containers:" - + ((response.getAllocatedContainers() == null) ? " empty" - : response.getAllocatedContainers().size())); - } - - if (requestInfo.getCallback() == null) { - throw new YarnException("Null callback from requestInfo"); - } - requestInfo.getCallback().callback(response); - } catch (InterruptedException ex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted while waiting for queue", ex); - } - } catch (IOException ex) { - LOG.warn("IO Error occurred while processing heart beat for " - + applicationId, ex); - } catch (Throwable ex) { - LOG.warn( - "Error occurred while processing heart beat for " + applicationId, - ex); - } - } - - LOG.info("UnmanagedApplicationManager has been stopped for {}. " - + "AMRequestHandlerThread thread is exiting", applicationId); - } - } - - /** - * Uncaught exception handler for the background heartbeat thread. - */ - protected class HeartBeatThreadUncaughtExceptionHandler - implements UncaughtExceptionHandler { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Heartbeat thread {} for application {} crashed!", - t.getName(), applicationId, e); - } + return this.heartbeatHandler.getRequestQueueSize(); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 65a22770390..eb818f1c69e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -116,6 +116,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { NMSS_CLASS_PREFIX + "secondarySC/"; public static final String STRING_TO_BYTE_FORMAT = "UTF-8"; + private ApplicationAttemptId attemptId; + /** * The home sub-cluster is the sub-cluster where the AM container is running * in. @@ -124,20 +126,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private SubClusterId homeSubClusterId; private volatile int lastHomeResponseId; - /** - * A flag for work preserving NM restart. If we just recovered, we need to - * generate an {@link ApplicationMasterNotRegisteredException} exception back - * to AM (similar to what RM will do after its restart/fail-over) in its next - * allocate to trigger AM re-register (which we will shield from RM and just - * return our saved register response) and a full pending requests re-send, so - * that all the {@link AMRMClientRelayer} will be re-populated with all - * pending requests. - * - * TODO: When split-merge is not idempotent, this can lead to some - * over-allocation without a full cancel to RM. - */ - private volatile boolean justRecovered; - /** * UAM pool for secondary sub-clusters (ones other than home sub-cluster), * using subClusterId as uamId. One UAM is created per sub-cluster RM except @@ -156,15 +144,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private Map secondaryRelayers; - /** Thread pool used for asynchronous operations. */ - private ExecutorService threadpool; - /** * Stores the AllocateResponses that are received asynchronously from all the * sub-cluster resource managers except the home RM. */ private Map> asyncResponseSink; + /** Thread pool used for asynchronous operations. */ + private ExecutorService threadpool; + + /** + * A flag for work preserving NM restart. If we just recovered, we need to + * generate an {@link ApplicationMasterNotRegisteredException} exception back + * to AM (similar to what RM will do after its restart/fail-over) in its next + * allocate to trigger AM re-register (which we will shield from RM and just + * return our saved register response) and a full pending requests re-send, so + * that all the {@link AMRMClientRelayer} will be re-populated with all + * pending requests. + * + * TODO: When split-merge is not idempotent, this can lead to some + * over-allocation without a full cancel to RM. + */ + private volatile boolean justRecovered; + /** * Used to keep track of the container Id and the sub cluster RM that created * the container, so that we know which sub-cluster to forward later requests @@ -179,7 +181,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private RegisterApplicationMasterRequest amRegistrationRequest; /** - * The original registration response from home RM. This instance is reused + * The original registration response returned to AM. This instance is reused * for duplicate register request from AM, triggered by timeout between AM and * AMRMProxy. */ @@ -247,12 +249,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } + this.attemptId = appContext.getApplicationAttemptId(); + ApplicationId appId = this.attemptId.getApplicationId(); this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRMRelayer = new AMRMClientRelayer( - createHomeRMProxy(appContext, ApplicationMasterProtocol.class, - this.appOwner), - getApplicationContext().getApplicationAttemptId().getApplicationId()); + this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, + ApplicationMasterProtocol.class, this.appOwner), appId); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -267,9 +269,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @Override public void recover(Map recoveredDataMap) { super.recover(recoveredDataMap); - ApplicationAttemptId attemptId = - getApplicationContext().getApplicationAttemptId(); - LOG.info("Recovering data for FederationInterceptor for {}", attemptId); + LOG.info("Recovering data for FederationInterceptor for {}", + this.attemptId); if (recoveredDataMap == null) { return; } @@ -280,7 +281,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY)); this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb); - LOG.info("amRegistrationRequest recovered for {}", attemptId); + LOG.info("amRegistrationRequest recovered for {}", this.attemptId); // Give the register request to homeRMRelayer for future re-registration this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest); @@ -291,7 +292,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY)); this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb); - LOG.info("amRegistrationResponse recovered for {}", attemptId); + LOG.info("amRegistrationResponse recovered for {}", this.attemptId); // Trigger re-register and full pending re-send only if we have a // saved register response. This should always be true though. this.justRecovered = true; @@ -301,9 +302,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Map> uamMap; if (this.registryClient != null) { uamMap = this.registryClient - .loadStateFromRegistry(attemptId.getApplicationId()); + .loadStateFromRegistry(this.attemptId.getApplicationId()); LOG.info("Found {} existing UAMs for application {} in Yarn Registry", - uamMap.size(), attemptId.getApplicationId()); + uamMap.size(), this.attemptId.getApplicationId()); } else { uamMap = new HashMap<>(); for (Entry entry : recoveredDataMap.entrySet()) { @@ -319,7 +320,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } LOG.info("Found {} existing UAMs for application {} in NMStateStore", - uamMap.size(), attemptId.getApplicationId()); + uamMap.size(), this.attemptId.getApplicationId()); } // Re-attach the UAMs @@ -336,7 +337,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { try { this.uamPool.reAttachUAM(subClusterId.getId(), config, - attemptId.getApplicationId(), + this.attemptId.getApplicationId(), this.amRegistrationResponse.getQueue(), getApplicationContext().getUser(), this.homeSubClusterId.getId(), entry.getValue()); @@ -359,9 +360,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { subClusterId); } catch (Exception e) { - LOG.error( - "Error reattaching UAM to " + subClusterId + " for " + attemptId, - e); + LOG.error("Error reattaching UAM to " + subClusterId + " for " + + this.attemptId, e); } } @@ -374,8 +374,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { createHomeRMProxy(getApplicationContext(), ApplicationClientProtocol.class, appSubmitter); - GetContainersResponse response = - rmClient.getContainers(GetContainersRequest.newInstance(attemptId)); + GetContainersResponse response = rmClient + .getContainers(GetContainersRequest.newInstance(this.attemptId)); for (ContainerReport container : response.getContainerList()) { containerIdToSubClusterIdMap.put(container.getContainerId(), this.homeSubClusterId); @@ -388,7 +388,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { LOG.info( "In all {} UAMs {} running containers including AM recovered for {}", - uamMap.size(), containers, attemptId); + uamMap.size(), containers, this.attemptId); if (this.amRegistrationResponse != null) { // Initialize the AMRMProxyPolicy @@ -439,12 +439,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { RegisterApplicationMasterRequestPBImpl pb = (RegisterApplicationMasterRequestPBImpl) this.amRegistrationRequest; - getNMStateStore().storeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), + getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); } catch (Exception e) { LOG.error("Error storing AMRMProxy application context entry for " - + getApplicationContext().getApplicationAttemptId(), e); + + this.attemptId, e); } } } @@ -479,8 +478,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId); } - ApplicationId appId = - getApplicationContext().getApplicationAttemptId().getApplicationId(); + ApplicationId appId = this.attemptId.getApplicationId(); reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); if (getNMStateStore() != null) { @@ -488,12 +486,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { RegisterApplicationMasterResponsePBImpl pb = (RegisterApplicationMasterResponsePBImpl) this.amRegistrationResponse; - getNMStateStore().storeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), + getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); } catch (Exception e) { LOG.error("Error storing AMRMProxy application context entry for " - + getApplicationContext().getApplicationAttemptId(), e); + + this.attemptId, e); } } @@ -535,8 +532,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.lastHomeResponseId = request.getResponseId(); throw new ApplicationMasterNotRegisteredException( - "AMRMProxy just restarted and recovered for " - + getApplicationContext().getApplicationAttemptId() + "AMRMProxy just restarted and recovered for " + this.attemptId + ". AM should re-register and full re-send pending requests."); } @@ -553,8 +549,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (this.justRecovered || request.getResponseId() > this.lastHomeResponseId) { LOG.warn("Setting allocate responseId for {} from {} to {}", - getApplicationContext().getApplicationAttemptId(), - request.getResponseId(), this.lastHomeResponseId); + this.attemptId, request.getResponseId(), this.lastHomeResponseId); request.setResponseId(this.lastHomeResponseId); } @@ -573,8 +568,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // Send the request to the home RM and get the response AllocateRequest homeRequest = requests.get(this.homeSubClusterId); - LOG.info("{} heartbeating to home RM with responseId {}", - getApplicationContext().getApplicationAttemptId(), + LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId, homeRequest.getResponseId()); AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest); @@ -612,8 +606,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } LOG.info("{} heartbeat response from home RM with responseId {}", - getApplicationContext().getApplicationAttemptId(), - homeResponse.getResponseId()); + this.attemptId, homeResponse.getResponseId()); // Update lastHomeResponseId in three cases: // 1. The normal responseId increments @@ -676,15 +669,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { secondaryRelayers.remove(subClusterId); if (getNMStateStore() != null) { - getNMStateStore().removeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), + getNMStateStore().removeAMRMProxyAppContextEntry(attemptId, NMSS_SECONDARY_SC_PREFIX + subClusterId); } } } catch (Throwable e) { LOG.warn("Failed to finish unmanaged application master: " + "RM address: " + subClusterId + " ApplicationId: " - + getApplicationContext().getApplicationAttemptId(), e); + + attemptId, e); } return new FinishApplicationMasterResponseInfo(uamResponse, subClusterId); @@ -720,8 +712,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } catch (Throwable e) { failedToUnRegister = true; LOG.warn("Failed to finish unmanaged application master: " - + " ApplicationId: " - + getApplicationContext().getApplicationAttemptId(), e); + + " ApplicationId: " + this.attemptId, e); } } } @@ -733,8 +724,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // attempt will be launched. this.uamPool.stop(); if (this.registryClient != null) { - this.registryClient.removeAppFromRegistry(getApplicationContext() - .getApplicationAttemptId().getApplicationId()); + this.registryClient + .removeAppFromRegistry(this.attemptId.getApplicationId()); } } return homeResponse; @@ -755,12 +746,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public void shutdown() { // Do not stop uamPool service and kill UAMs here because of possible second // app attempt - if (threadpool != null) { + if (this.threadpool != null) { try { - threadpool.shutdown(); + this.threadpool.shutdown(); } catch (Throwable ex) { } - threadpool = null; + this.threadpool = null; } super.shutdown(); } @@ -1090,59 +1081,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), - new AsyncCallback() { - @Override - public void callback(AllocateResponse response) { - synchronized (asyncResponseSink) { - List responses = null; - if (asyncResponseSink.containsKey(subClusterId)) { - responses = asyncResponseSink.get(subClusterId); - } else { - responses = new ArrayList<>(); - asyncResponseSink.put(subClusterId, responses); - } - responses.add(response); - } - - // Save the new AMRMToken for the UAM if present - if (response.getAMRMToken() != null) { - Token newToken = ConverterUtils - .convertFromYarn(response.getAMRMToken(), (Text) null); - // Update the token in registry or NMSS - if (registryClient != null) { - registryClient - .writeAMRMTokenForUAM( - getApplicationContext().getApplicationAttemptId() - .getApplicationId(), - subClusterId.getId(), newToken); - } else if (getNMStateStore() != null) { - try { - getNMStateStore().storeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), - NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), - newToken.encodeToUrlString() - .getBytes(STRING_TO_BYTE_FORMAT)); - } catch (IOException e) { - LOG.error( - "Error storing UAM token as AMRMProxy " - + "context entry in NMSS for " - + getApplicationContext().getApplicationAttemptId(), - e); - } - } - } - - // Notify policy of secondary sub-cluster responses - try { - policyInterpreter.notifyOfResponse(subClusterId, response); - } catch (YarnException e) { - LOG.warn( - "notifyOfResponse for policy failed for home sub-cluster " - + subClusterId, - e); - } - } - }); + new HeartbeatCallBack(subClusterId)); } return registrations; @@ -1195,7 +1134,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { try { // For appNameSuffix, use subClusterId of the home sub-cluster token = uamPool.launchUAM(subClusterId, config, - appContext.getApplicationAttemptId().getApplicationId(), + attemptId.getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), homeSubClusterId.toString(), true); @@ -1206,8 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { registerRequest); } catch (Throwable e) { LOG.error("Failed to register application master: " - + subClusterId + " Application: " - + appContext.getApplicationAttemptId(), e); + + subClusterId + " Application: " + attemptId, e); } return new RegisterApplicationMasterResponseInfo(uamResponse, SubClusterId.newInstance(subClusterId), token); @@ -1232,20 +1170,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } else { LOG.info("Successfully registered unmanaged application master: " + uamResponse.getSubClusterId() + " ApplicationId: " - + getApplicationContext().getApplicationAttemptId()); + + this.attemptId); successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse()); // Save the UAM token in registry or NMSS if (registryClient != null) { registryClient.writeAMRMTokenForUAM( - getApplicationContext().getApplicationAttemptId() - .getApplicationId(), + this.attemptId.getApplicationId(), uamResponse.getSubClusterId().getId(), uamResponse.getUamToken()); } else if (getNMStateStore() != null) { - getNMStateStore().storeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), + getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId().getId(), uamResponse.getUamToken().encodeToUrlString() @@ -1254,8 +1190,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } catch (Exception e) { LOG.warn("Failed to register unmanaged application master: " - + " ApplicationId: " - + getApplicationContext().getApplicationAttemptId(), e); + + " ApplicationId: " + this.attemptId, e); } } } @@ -1490,9 +1425,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { "Duplicate containerID found in the allocated containers. This" + " can happen if the RM epoch is not configured properly." + " ContainerId: " + container.getId().toString() - + " ApplicationId: " - + getApplicationContext().getApplicationAttemptId() - + " From RM: " + subClusterId + + " ApplicationId: " + this.attemptId + " From RM: " + + subClusterId + " . Previous container was from sub-cluster: " + existingSubClusterId); } @@ -1587,6 +1521,59 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return this.asyncResponseSink; } + /** + * Async callback handler for heart beat response from all sub-clusters. + */ + private class HeartbeatCallBack implements AsyncCallback { + private SubClusterId subClusterId; + + HeartbeatCallBack(SubClusterId subClusterId) { + this.subClusterId = subClusterId; + } + + @Override + public void callback(AllocateResponse response) { + synchronized (asyncResponseSink) { + List responses = null; + if (asyncResponseSink.containsKey(subClusterId)) { + responses = asyncResponseSink.get(subClusterId); + } else { + responses = new ArrayList<>(); + asyncResponseSink.put(subClusterId, responses); + } + responses.add(response); + } + + // Save the new AMRMToken for the UAM if present + if (response.getAMRMToken() != null) { + Token newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + // Update the token in registry or NMSS + if (registryClient != null) { + registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), + subClusterId.getId(), newToken); + } else if (getNMStateStore() != null) { + try { + getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, + NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), + newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT)); + } catch (IOException e) { + LOG.error("Error storing UAM token as AMRMProxy " + + "context entry in NMSS for " + attemptId, e); + } + } + } + + // Notify policy of secondary sub-cluster responses + try { + policyInterpreter.notifyOfResponse(subClusterId, response); + } catch (YarnException e) { + LOG.warn("notifyOfResponse for policy failed for home sub-cluster " + + subClusterId, e); + } + } + } + /** * Private structure for encapsulating SubClusterId and * RegisterApplicationMasterResponse instances.