YARN-8705. Refactor the UAM heartbeat thread in preparation for YARN-8696. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-08-27 10:32:22 -07:00
parent 7b1fa5693e
commit f152582562
3 changed files with 358 additions and 284 deletions

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Extends Thread and provides an implementation that is used for processing the
* AM heart beat request asynchronously and sending back the response using the
* callback method registered with the system.
*/
public class AMHeartbeatRequestHandler extends Thread {
public static final Logger LOG =
LoggerFactory.getLogger(AMHeartbeatRequestHandler.class);
// Indication flag for the thread to keep running
private volatile boolean keepRunning;
private Configuration conf;
private ApplicationId applicationId;
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
private AMRMClientRelayer rmProxyRelayer;
private UserGroupInformation userUgi;
private int lastResponseId;
public AMHeartbeatRequestHandler(Configuration conf,
ApplicationId applicationId) {
super("AMHeartbeatRequestHandler Heartbeat Handler Thread");
this.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
this.keepRunning = true;
this.conf = conf;
this.applicationId = applicationId;
this.requestQueue = new LinkedBlockingQueue<>();
resetLastResponseId();
}
/**
* Shutdown the thread.
*/
public void shutdown() {
this.keepRunning = false;
this.interrupt();
}
@Override
public void run() {
while (keepRunning) {
AsyncAllocateRequestInfo requestInfo;
try {
requestInfo = requestQueue.take();
if (requestInfo == null) {
throw new YarnException(
"Null requestInfo taken from request queue");
}
if (!keepRunning) {
break;
}
// change the response id before forwarding the allocate request as we
// could have different values for each UAM
AllocateRequest request = requestInfo.getRequest();
if (request == null) {
throw new YarnException("Null allocateRequest from requestInfo");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+ ((request.getAskList() == null) ? " empty"
: request.getAskList().size()));
}
request.setResponseId(lastResponseId);
AllocateResponse response = rmProxyRelayer.allocate(request);
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
lastResponseId = response.getResponseId();
// update token if RM has reissued/renewed
if (response.getAMRMToken() != null) {
LOG.debug("Received new AMRMToken");
YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
userUgi, conf);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+ ((response.getAllocatedContainers() == null) ? " empty"
: response.getAllocatedContainers().size()));
}
if (requestInfo.getCallback() == null) {
throw new YarnException("Null callback from requestInfo");
}
requestInfo.getCallback().callback(response);
} catch (InterruptedException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted while waiting for queue", ex);
}
} catch (Throwable ex) {
LOG.warn(
"Error occurred while processing heart beat for " + applicationId,
ex);
}
}
LOG.info("AMHeartbeatRequestHandler thread for {} is exiting",
applicationId);
}
/**
* Reset the lastResponseId to zero.
*/
public void resetLastResponseId() {
this.lastResponseId = 0;
}
/**
* Set the AMRMClientRelayer for RM connection.
*/
public void setAMRMClientRelayer(AMRMClientRelayer relayer) {
this.rmProxyRelayer = relayer;
}
/**
* Set the UGI for RM connection.
*/
public void setUGI(UserGroupInformation ugi) {
this.userUgi = ugi;
}
/**
* Sends the specified heart beat request to the resource manager and invokes
* the callback asynchronously with the response.
*
* @param request the allocate request
* @param callback the callback method for the request
* @throws YarnException if registerAM is not called yet
*/
public void allocateAsync(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) throws YarnException {
try {
this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
} catch (InterruptedException ex) {
// Should not happen as we have MAX_INT queue length
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
}
@VisibleForTesting
public int getRequestQueueSize() {
return this.requestQueue.size();
}
/**
* Data structure that encapsulates AllocateRequest and AsyncCallback
* instance.
*/
public static class AsyncAllocateRequestInfo {
private AllocateRequest request;
private AsyncCallback<AllocateResponse> callback;
public AsyncAllocateRequestInfo(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) {
Preconditions.checkArgument(request != null,
"AllocateRequest cannot be null");
Preconditions.checkArgument(callback != null, "Callback cannot be null");
this.request = request;
this.callback = callback;
}
public AsyncCallback<AllocateResponse> getCallback() {
return this.callback;
}
public AllocateRequest getRequest() {
return this.request;
}
}
/**
* Uncaught exception handler for the background heartbeat thread.
*/
public class HeartBeatThreadUncaughtExceptionHandler
implements UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Heartbeat thread {} for application {} crashed!", t.getName(),
applicationId, e);
}
}
}

View File

@ -19,11 +19,8 @@
package org.apache.hadoop.yarn.server.uam;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -63,9 +60,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
@ -89,8 +86,7 @@ public class UnmanagedApplicationManager {
public static final String APP_NAME = "UnmanagedAM";
private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
private AMRequestHandlerThread handlerThread;
private AMHeartbeatRequestHandler heartbeatHandler;
private AMRMClientRelayer rmProxyRelayer;
private ApplicationId applicationId;
private String submitter;
@ -99,7 +95,6 @@ public class UnmanagedApplicationManager {
private String queueName;
private UserGroupInformation userUgi;
private RegisterApplicationMasterRequest registerRequest;
private int lastResponseId;
private ApplicationClientProtocol rmClient;
private long asyncApiPollIntervalMillis;
private RecordFactory recordFactory;
@ -137,8 +132,8 @@ public class UnmanagedApplicationManager {
this.queueName = queueName;
this.submitter = submitter;
this.appNameSuffix = appNameSuffix;
this.handlerThread = new AMRequestHandlerThread();
this.requestQueue = new LinkedBlockingQueue<>();
this.heartbeatHandler =
new AMHeartbeatRequestHandler(this.conf, this.applicationId);
this.rmProxyRelayer = null;
this.connectionInitiated = false;
this.registerRequest = null;
@ -194,6 +189,9 @@ public class UnmanagedApplicationManager {
this.rmProxyRelayer =
new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
this.conf, this.userUgi, amrmToken), this.applicationId);
this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
this.heartbeatHandler.setUGI(this.userUgi);
}
/**
@ -215,7 +213,7 @@ public class UnmanagedApplicationManager {
this.applicationId);
RegisterApplicationMasterResponse response =
this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
this.lastResponseId = 0;
this.heartbeatHandler.resetLastResponseId();
for (Container container : response.getContainersFromPreviousAttempts()) {
LOG.debug("RegisterUAM returned existing running container "
@ -227,12 +225,9 @@ public class UnmanagedApplicationManager {
}
// Only when register succeed that we start the heartbeat thread
this.handlerThread.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
this.handlerThread.setDaemon(true);
this.handlerThread.start();
this.heartbeatHandler.setDaemon(true);
this.heartbeatHandler.start();
this.lastResponseId = 0;
return response;
}
@ -248,7 +243,7 @@ public class UnmanagedApplicationManager {
FinishApplicationMasterRequest request)
throws YarnException, IOException {
this.handlerThread.shutdown();
this.heartbeatHandler.shutdown();
if (this.rmProxyRelayer == null) {
if (this.connectionInitiated) {
@ -277,7 +272,7 @@ public class UnmanagedApplicationManager {
KillApplicationRequest request =
KillApplicationRequest.newInstance(this.applicationId);
this.handlerThread.shutdown();
this.heartbeatHandler.shutdown();
if (this.rmClient == null) {
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
@ -296,12 +291,8 @@ public class UnmanagedApplicationManager {
*/
public void allocateAsync(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) throws YarnException {
try {
this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
} catch (InterruptedException ex) {
// Should not happen as we have MAX_INT queue length
LOG.debug("Interrupted while waiting to put on response queue", ex);
}
this.heartbeatHandler.allocateAsync(request, callback);
// Two possible cases why the UAM is not successfully registered yet:
// 1. launchUAM is not called at all. Should throw here.
// 2. launchUAM is called but hasn't successfully returned.
@ -519,139 +510,8 @@ public class UnmanagedApplicationManager {
return this.rmClient.getApplicationReport(request).getApplicationReport();
}
/**
* Data structure that encapsulates AllocateRequest and AsyncCallback
* instance.
*/
public static class AsyncAllocateRequestInfo {
private AllocateRequest request;
private AsyncCallback<AllocateResponse> callback;
public AsyncAllocateRequestInfo(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) {
Preconditions.checkArgument(request != null,
"AllocateRequest cannot be null");
Preconditions.checkArgument(callback != null, "Callback cannot be null");
this.request = request;
this.callback = callback;
}
public AsyncCallback<AllocateResponse> getCallback() {
return this.callback;
}
public AllocateRequest getRequest() {
return this.request;
}
}
@VisibleForTesting
public int getRequestQueueSize() {
return this.requestQueue.size();
}
/**
* Extends Thread and provides an implementation that is used for processing
* the AM heart beat request asynchronously and sending back the response
* using the callback method registered with the system.
*/
public class AMRequestHandlerThread extends Thread {
// Indication flag for the thread to keep running
private volatile boolean keepRunning;
public AMRequestHandlerThread() {
super("UnmanagedApplicationManager Heartbeat Handler Thread");
this.keepRunning = true;
}
/**
* Shutdown the thread.
*/
public void shutdown() {
this.keepRunning = false;
this.interrupt();
}
@Override
public void run() {
while (keepRunning) {
AsyncAllocateRequestInfo requestInfo;
try {
requestInfo = requestQueue.take();
if (requestInfo == null) {
throw new YarnException(
"Null requestInfo taken from request queue");
}
if (!keepRunning) {
break;
}
// change the response id before forwarding the allocate request as we
// could have different values for each UAM
AllocateRequest request = requestInfo.getRequest();
if (request == null) {
throw new YarnException("Null allocateRequest from requestInfo");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+ ((request.getAskList() == null) ? " empty"
: request.getAskList().size()));
}
request.setResponseId(lastResponseId);
AllocateResponse response = rmProxyRelayer.allocate(request);
if (response == null) {
throw new YarnException("Null allocateResponse from allocate");
}
lastResponseId = response.getResponseId();
// update token if RM has reissued/renewed
if (response.getAMRMToken() != null) {
LOG.debug("Received new AMRMToken");
YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
userUgi, conf);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+ ((response.getAllocatedContainers() == null) ? " empty"
: response.getAllocatedContainers().size()));
}
if (requestInfo.getCallback() == null) {
throw new YarnException("Null callback from requestInfo");
}
requestInfo.getCallback().callback(response);
} catch (InterruptedException ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted while waiting for queue", ex);
}
} catch (IOException ex) {
LOG.warn("IO Error occurred while processing heart beat for "
+ applicationId, ex);
} catch (Throwable ex) {
LOG.warn(
"Error occurred while processing heart beat for " + applicationId,
ex);
}
}
LOG.info("UnmanagedApplicationManager has been stopped for {}. "
+ "AMRequestHandlerThread thread is exiting", applicationId);
}
}
/**
* Uncaught exception handler for the background heartbeat thread.
*/
protected class HeartBeatThreadUncaughtExceptionHandler
implements UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Heartbeat thread {} for application {} crashed!",
t.getName(), applicationId, e);
}
return this.heartbeatHandler.getRequestQueueSize();
}
}

View File

@ -116,6 +116,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
NMSS_CLASS_PREFIX + "secondarySC/";
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
private ApplicationAttemptId attemptId;
/**
* The home sub-cluster is the sub-cluster where the AM container is running
* in.
@ -124,20 +126,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private SubClusterId homeSubClusterId;
private volatile int lastHomeResponseId;
/**
* A flag for work preserving NM restart. If we just recovered, we need to
* generate an {@link ApplicationMasterNotRegisteredException} exception back
* to AM (similar to what RM will do after its restart/fail-over) in its next
* allocate to trigger AM re-register (which we will shield from RM and just
* return our saved register response) and a full pending requests re-send, so
* that all the {@link AMRMClientRelayer} will be re-populated with all
* pending requests.
*
* TODO: When split-merge is not idempotent, this can lead to some
* over-allocation without a full cancel to RM.
*/
private volatile boolean justRecovered;
/**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
* using subClusterId as uamId. One UAM is created per sub-cluster RM except
@ -156,15 +144,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private Map<String, AMRMClientRelayer> secondaryRelayers;
/** Thread pool used for asynchronous operations. */
private ExecutorService threadpool;
/**
* Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers except the home RM.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
/** Thread pool used for asynchronous operations. */
private ExecutorService threadpool;
/**
* A flag for work preserving NM restart. If we just recovered, we need to
* generate an {@link ApplicationMasterNotRegisteredException} exception back
* to AM (similar to what RM will do after its restart/fail-over) in its next
* allocate to trigger AM re-register (which we will shield from RM and just
* return our saved register response) and a full pending requests re-send, so
* that all the {@link AMRMClientRelayer} will be re-populated with all
* pending requests.
*
* TODO: When split-merge is not idempotent, this can lead to some
* over-allocation without a full cancel to RM.
*/
private volatile boolean justRecovered;
/**
* Used to keep track of the container Id and the sub cluster RM that created
* the container, so that we know which sub-cluster to forward later requests
@ -179,7 +181,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private RegisterApplicationMasterRequest amRegistrationRequest;
/**
* The original registration response from home RM. This instance is reused
* The original registration response returned to AM. This instance is reused
* for duplicate register request from AM, triggered by timeout between AM and
* AMRMProxy.
*/
@ -247,12 +249,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
this.attemptId = appContext.getApplicationAttemptId();
ApplicationId appId = this.attemptId.getApplicationId();
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRMRelayer = new AMRMClientRelayer(
createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
this.appOwner),
getApplicationContext().getApplicationAttemptId().getApplicationId());
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
ApplicationMasterProtocol.class, this.appOwner), appId);
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@ -267,9 +269,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
@Override
public void recover(Map<String, byte[]> recoveredDataMap) {
super.recover(recoveredDataMap);
ApplicationAttemptId attemptId =
getApplicationContext().getApplicationAttemptId();
LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
LOG.info("Recovering data for FederationInterceptor for {}",
this.attemptId);
if (recoveredDataMap == null) {
return;
}
@ -280,7 +281,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
this.amRegistrationRequest =
new RegisterApplicationMasterRequestPBImpl(pb);
LOG.info("amRegistrationRequest recovered for {}", attemptId);
LOG.info("amRegistrationRequest recovered for {}", this.attemptId);
// Give the register request to homeRMRelayer for future re-registration
this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
@ -291,7 +292,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb);
LOG.info("amRegistrationResponse recovered for {}", attemptId);
LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
// Trigger re-register and full pending re-send only if we have a
// saved register response. This should always be true though.
this.justRecovered = true;
@ -301,9 +302,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<String, Token<AMRMTokenIdentifier>> uamMap;
if (this.registryClient != null) {
uamMap = this.registryClient
.loadStateFromRegistry(attemptId.getApplicationId());
.loadStateFromRegistry(this.attemptId.getApplicationId());
LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
uamMap.size(), attemptId.getApplicationId());
uamMap.size(), this.attemptId.getApplicationId());
} else {
uamMap = new HashMap<>();
for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
@ -319,7 +320,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
LOG.info("Found {} existing UAMs for application {} in NMStateStore",
uamMap.size(), attemptId.getApplicationId());
uamMap.size(), this.attemptId.getApplicationId());
}
// Re-attach the UAMs
@ -336,7 +337,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
try {
this.uamPool.reAttachUAM(subClusterId.getId(), config,
attemptId.getApplicationId(),
this.attemptId.getApplicationId(),
this.amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
entry.getValue());
@ -359,9 +360,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
subClusterId);
} catch (Exception e) {
LOG.error(
"Error reattaching UAM to " + subClusterId + " for " + attemptId,
e);
LOG.error("Error reattaching UAM to " + subClusterId + " for "
+ this.attemptId, e);
}
}
@ -374,8 +374,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
createHomeRMProxy(getApplicationContext(),
ApplicationClientProtocol.class, appSubmitter);
GetContainersResponse response =
rmClient.getContainers(GetContainersRequest.newInstance(attemptId));
GetContainersResponse response = rmClient
.getContainers(GetContainersRequest.newInstance(this.attemptId));
for (ContainerReport container : response.getContainerList()) {
containerIdToSubClusterIdMap.put(container.getContainerId(),
this.homeSubClusterId);
@ -388,7 +388,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
LOG.info(
"In all {} UAMs {} running containers including AM recovered for {}",
uamMap.size(), containers, attemptId);
uamMap.size(), containers, this.attemptId);
if (this.amRegistrationResponse != null) {
// Initialize the AMRMProxyPolicy
@ -439,12 +439,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
RegisterApplicationMasterRequestPBImpl pb =
(RegisterApplicationMasterRequestPBImpl)
this.amRegistrationRequest;
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ getApplicationContext().getApplicationAttemptId(), e);
+ this.attemptId, e);
}
}
}
@ -479,8 +478,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId);
}
ApplicationId appId =
getApplicationContext().getApplicationAttemptId().getApplicationId();
ApplicationId appId = this.attemptId.getApplicationId();
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
if (getNMStateStore() != null) {
@ -488,12 +486,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
RegisterApplicationMasterResponsePBImpl pb =
(RegisterApplicationMasterResponsePBImpl)
this.amRegistrationResponse;
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ getApplicationContext().getApplicationAttemptId(), e);
+ this.attemptId, e);
}
}
@ -535,8 +532,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.lastHomeResponseId = request.getResponseId();
throw new ApplicationMasterNotRegisteredException(
"AMRMProxy just restarted and recovered for "
+ getApplicationContext().getApplicationAttemptId()
"AMRMProxy just restarted and recovered for " + this.attemptId
+ ". AM should re-register and full re-send pending requests.");
}
@ -553,8 +549,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (this.justRecovered
|| request.getResponseId() > this.lastHomeResponseId) {
LOG.warn("Setting allocate responseId for {} from {} to {}",
getApplicationContext().getApplicationAttemptId(),
request.getResponseId(), this.lastHomeResponseId);
this.attemptId, request.getResponseId(), this.lastHomeResponseId);
request.setResponseId(this.lastHomeResponseId);
}
@ -573,8 +568,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Send the request to the home RM and get the response
AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
LOG.info("{} heartbeating to home RM with responseId {}",
getApplicationContext().getApplicationAttemptId(),
LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
homeRequest.getResponseId());
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
@ -612,8 +606,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
LOG.info("{} heartbeat response from home RM with responseId {}",
getApplicationContext().getApplicationAttemptId(),
homeResponse.getResponseId());
this.attemptId, homeResponse.getResponseId());
// Update lastHomeResponseId in three cases:
// 1. The normal responseId increments
@ -676,15 +669,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
secondaryRelayers.remove(subClusterId);
if (getNMStateStore() != null) {
getNMStateStore().removeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
NMSS_SECONDARY_SC_PREFIX + subClusterId);
}
}
} catch (Throwable e) {
LOG.warn("Failed to finish unmanaged application master: "
+ "RM address: " + subClusterId + " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId(), e);
+ attemptId, e);
}
return new FinishApplicationMasterResponseInfo(uamResponse,
subClusterId);
@ -720,8 +712,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} catch (Throwable e) {
failedToUnRegister = true;
LOG.warn("Failed to finish unmanaged application master: "
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId(), e);
+ " ApplicationId: " + this.attemptId, e);
}
}
}
@ -733,8 +724,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// attempt will be launched.
this.uamPool.stop();
if (this.registryClient != null) {
this.registryClient.removeAppFromRegistry(getApplicationContext()
.getApplicationAttemptId().getApplicationId());
this.registryClient
.removeAppFromRegistry(this.attemptId.getApplicationId());
}
}
return homeResponse;
@ -755,12 +746,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public void shutdown() {
// Do not stop uamPool service and kill UAMs here because of possible second
// app attempt
if (threadpool != null) {
if (this.threadpool != null) {
try {
threadpool.shutdown();
this.threadpool.shutdown();
} catch (Throwable ex) {
}
threadpool = null;
this.threadpool = null;
}
super.shutdown();
}
@ -1090,59 +1081,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
new AsyncCallback<AllocateResponse>() {
@Override
public void callback(AllocateResponse response) {
synchronized (asyncResponseSink) {
List<AllocateResponse> responses = null;
if (asyncResponseSink.containsKey(subClusterId)) {
responses = asyncResponseSink.get(subClusterId);
} else {
responses = new ArrayList<>();
asyncResponseSink.put(subClusterId, responses);
}
responses.add(response);
}
// Save the new AMRMToken for the UAM if present
if (response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
// Update the token in registry or NMSS
if (registryClient != null) {
registryClient
.writeAMRMTokenForUAM(
getApplicationContext().getApplicationAttemptId()
.getApplicationId(),
subClusterId.getId(), newToken);
} else if (getNMStateStore() != null) {
try {
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
newToken.encodeToUrlString()
.getBytes(STRING_TO_BYTE_FORMAT));
} catch (IOException e) {
LOG.error(
"Error storing UAM token as AMRMProxy "
+ "context entry in NMSS for "
+ getApplicationContext().getApplicationAttemptId(),
e);
}
}
}
// Notify policy of secondary sub-cluster responses
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn(
"notifyOfResponse for policy failed for home sub-cluster "
+ subClusterId,
e);
}
}
});
new HeartbeatCallBack(subClusterId));
}
return registrations;
@ -1195,7 +1134,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
try {
// For appNameSuffix, use subClusterId of the home sub-cluster
token = uamPool.launchUAM(subClusterId, config,
appContext.getApplicationAttemptId().getApplicationId(),
attemptId.getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
homeSubClusterId.toString(), true);
@ -1206,8 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
registerRequest);
} catch (Throwable e) {
LOG.error("Failed to register application master: "
+ subClusterId + " Application: "
+ appContext.getApplicationAttemptId(), e);
+ subClusterId + " Application: " + attemptId, e);
}
return new RegisterApplicationMasterResponseInfo(uamResponse,
SubClusterId.newInstance(subClusterId), token);
@ -1232,20 +1170,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} else {
LOG.info("Successfully registered unmanaged application master: "
+ uamResponse.getSubClusterId() + " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId());
+ this.attemptId);
successfulRegistrations.put(uamResponse.getSubClusterId(),
uamResponse.getResponse());
// Save the UAM token in registry or NMSS
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(
getApplicationContext().getApplicationAttemptId()
.getApplicationId(),
this.attemptId.getApplicationId(),
uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken());
} else if (getNMStateStore() != null) {
getNMStateStore().storeAMRMProxyAppContextEntry(
getApplicationContext().getApplicationAttemptId(),
getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_SECONDARY_SC_PREFIX
+ uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken().encodeToUrlString()
@ -1254,8 +1190,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
} catch (Exception e) {
LOG.warn("Failed to register unmanaged application master: "
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId(), e);
+ " ApplicationId: " + this.attemptId, e);
}
}
}
@ -1490,9 +1425,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
"Duplicate containerID found in the allocated containers. This"
+ " can happen if the RM epoch is not configured properly."
+ " ContainerId: " + container.getId().toString()
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId()
+ " From RM: " + subClusterId
+ " ApplicationId: " + this.attemptId + " From RM: "
+ subClusterId
+ " . Previous container was from sub-cluster: "
+ existingSubClusterId);
}
@ -1587,6 +1521,59 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return this.asyncResponseSink;
}
/**
* Async callback handler for heart beat response from all sub-clusters.
*/
private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
private SubClusterId subClusterId;
HeartbeatCallBack(SubClusterId subClusterId) {
this.subClusterId = subClusterId;
}
@Override
public void callback(AllocateResponse response) {
synchronized (asyncResponseSink) {
List<AllocateResponse> responses = null;
if (asyncResponseSink.containsKey(subClusterId)) {
responses = asyncResponseSink.get(subClusterId);
} else {
responses = new ArrayList<>();
asyncResponseSink.put(subClusterId, responses);
}
responses.add(response);
}
// Save the new AMRMToken for the UAM if present
if (response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
// Update the token in registry or NMSS
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
subClusterId.getId(), newToken);
} else if (getNMStateStore() != null) {
try {
getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
} catch (IOException e) {
LOG.error("Error storing UAM token as AMRMProxy "
+ "context entry in NMSS for " + attemptId, e);
}
}
}
// Notify policy of secondary sub-cluster responses
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ subClusterId, e);
}
}
}
/**
* Private structure for encapsulating SubClusterId and
* RegisterApplicationMasterResponse instances.