YARN-7899. [AMRMProxy] Stateful FederationInterceptor for pending requests. Contributed by Botong Huang.
This commit is contained in:
parent
4c54ddd1c2
commit
aab9bfc13c
|
@ -30,17 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -59,87 +49,6 @@ public final class AMRMClientUtils {
|
|||
private AMRMClientUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle ApplicationNotRegistered exception and re-register.
|
||||
*
|
||||
* @param appId application Id
|
||||
* @param rmProxy RM proxy instance
|
||||
* @param registerRequest the AM re-register request
|
||||
* @throws YarnException if re-register fails
|
||||
*/
|
||||
public static void handleNotRegisteredExceptionAndReRegister(
|
||||
ApplicationId appId, ApplicationMasterProtocol rmProxy,
|
||||
RegisterApplicationMasterRequest registerRequest) throws YarnException {
|
||||
LOG.info("App attempt {} not registered, most likely due to RM failover. "
|
||||
+ " Trying to re-register.", appId);
|
||||
try {
|
||||
rmProxy.registerApplicationMaster(registerRequest);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof InvalidApplicationMasterRequestException
|
||||
&& e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
|
||||
LOG.info("Concurrent thread successfully registered, moving on.");
|
||||
} else {
|
||||
LOG.error("Error trying to re-register AM", e);
|
||||
throw new YarnException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method for client calling ApplicationMasterProtocol.allocate that
|
||||
* handles re-register if RM fails over.
|
||||
*
|
||||
* @param request allocate request
|
||||
* @param rmProxy RM proxy
|
||||
* @param registerRequest the register request for re-register
|
||||
* @param appId application id
|
||||
* @return allocate response
|
||||
* @throws YarnException if RM call fails
|
||||
* @throws IOException if RM call fails
|
||||
*/
|
||||
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
|
||||
ApplicationMasterProtocol rmProxy,
|
||||
RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
try {
|
||||
return rmProxy.allocate(request);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
|
||||
registerRequest);
|
||||
// reset responseId after re-register
|
||||
request.setResponseId(0);
|
||||
// retry allocate
|
||||
return allocateWithReRegister(request, rmProxy, registerRequest, appId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method for client calling
|
||||
* ApplicationMasterProtocol.finishApplicationMaster that handles re-register
|
||||
* if RM fails over.
|
||||
*
|
||||
* @param request finishApplicationMaster request
|
||||
* @param rmProxy RM proxy
|
||||
* @param registerRequest the register request for re-register
|
||||
* @param appId application id
|
||||
* @return finishApplicationMaster response
|
||||
* @throws YarnException if RM call fails
|
||||
* @throws IOException if RM call fails
|
||||
*/
|
||||
public static FinishApplicationMasterResponse finishAMWithReRegister(
|
||||
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
|
||||
RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
|
||||
throws YarnException, IOException {
|
||||
try {
|
||||
return rmProxy.finishApplicationMaster(request);
|
||||
} catch (ApplicationMasterNotRegisteredException ex) {
|
||||
handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
|
||||
registerRequest);
|
||||
// retry finishAM after re-register
|
||||
return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a proxy for the specified protocol.
|
||||
*
|
||||
|
|
|
@ -141,6 +141,11 @@ public class AMRMClientRelayer extends AbstractService
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
public void setAMRegistrationRequest(
|
||||
RegisterApplicationMasterRequest registerRequest) {
|
||||
this.amRegistrationRequest = registerRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request)
|
||||
|
@ -239,8 +244,10 @@ public class AMRMClientRelayer extends AbstractService
|
|||
this.change.putAll(this.remotePendingChange);
|
||||
}
|
||||
|
||||
// re register with RM, then retry allocate recursively
|
||||
// re-register with RM, then retry allocate recursively
|
||||
registerApplicationMaster(this.amRegistrationRequest);
|
||||
// Reset responseId after re-register
|
||||
allocateRequest.setResponseId(0);
|
||||
return allocate(allocateRequest);
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
|
||||
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -384,4 +385,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
|
|||
return this.unmanagedAppMasterMap.containsKey(uamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the rmProxy relayer of an UAM.
|
||||
*
|
||||
* @param uamId uam Id
|
||||
* @return the rmProxy relayer
|
||||
* @throws YarnException if fails
|
||||
*/
|
||||
public AMRMClientRelayer getAMRMClientRelayer(String uamId)
|
||||
throws YarnException {
|
||||
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
|
||||
throw new YarnException("UAM " + uamId + " does not exist");
|
||||
}
|
||||
return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
import org.apache.hadoop.yarn.util.AsyncCallback;
|
||||
|
@ -90,7 +91,7 @@ public class UnmanagedApplicationManager {
|
|||
|
||||
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
|
||||
private AMRequestHandlerThread handlerThread;
|
||||
private ApplicationMasterProtocol rmProxy;
|
||||
private AMRMClientRelayer rmProxyRelayer;
|
||||
private ApplicationId applicationId;
|
||||
private String submitter;
|
||||
private String appNameSuffix;
|
||||
|
@ -138,7 +139,7 @@ public class UnmanagedApplicationManager {
|
|||
this.appNameSuffix = appNameSuffix;
|
||||
this.handlerThread = new AMRequestHandlerThread();
|
||||
this.requestQueue = new LinkedBlockingQueue<>();
|
||||
this.rmProxy = null;
|
||||
this.rmProxyRelayer = null;
|
||||
this.connectionInitiated = false;
|
||||
this.registerRequest = null;
|
||||
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||
|
@ -190,8 +191,9 @@ public class UnmanagedApplicationManager {
|
|||
throws IOException {
|
||||
this.userUgi = UserGroupInformation.createProxyUser(
|
||||
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
|
||||
this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
|
||||
this.userUgi, amrmToken);
|
||||
this.rmProxyRelayer =
|
||||
new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
|
||||
this.conf, this.userUgi, amrmToken));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -209,19 +211,18 @@ public class UnmanagedApplicationManager {
|
|||
// Save the register request for re-register later
|
||||
this.registerRequest = request;
|
||||
|
||||
// Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
|
||||
// We do not expect application already registered exception here
|
||||
LOG.info("Registering the Unmanaged application master {}",
|
||||
this.applicationId);
|
||||
RegisterApplicationMasterResponse response =
|
||||
this.rmProxy.registerApplicationMaster(this.registerRequest);
|
||||
this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
|
||||
this.lastResponseId = 0;
|
||||
|
||||
for (Container container : response.getContainersFromPreviousAttempts()) {
|
||||
LOG.info("RegisterUAM returned existing running container "
|
||||
LOG.debug("RegisterUAM returned existing running container "
|
||||
+ container.getId());
|
||||
}
|
||||
for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
|
||||
LOG.info("RegisterUAM returned existing NM token for node "
|
||||
LOG.debug("RegisterUAM returned existing NM token for node "
|
||||
+ nmToken.getNodeId());
|
||||
}
|
||||
|
||||
|
@ -249,7 +250,7 @@ public class UnmanagedApplicationManager {
|
|||
|
||||
this.handlerThread.shutdown();
|
||||
|
||||
if (this.rmProxy == null) {
|
||||
if (this.rmProxyRelayer == null) {
|
||||
if (this.connectionInitiated) {
|
||||
// This is possible if the async launchUAM is still
|
||||
// blocked and retrying. Return a dummy response in this case.
|
||||
|
@ -261,8 +262,7 @@ public class UnmanagedApplicationManager {
|
|||
+ "be called before createAndRegister");
|
||||
}
|
||||
}
|
||||
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
|
||||
this.registerRequest, this.applicationId);
|
||||
return this.rmProxyRelayer.finishApplicationMaster(request);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -308,7 +308,7 @@ public class UnmanagedApplicationManager {
|
|||
//
|
||||
// In case 2, we have already save the allocate request above, so if the
|
||||
// registration succeed later, no request is lost.
|
||||
if (this.rmProxy == null) {
|
||||
if (this.rmProxyRelayer == null) {
|
||||
if (this.connectionInitiated) {
|
||||
LOG.info("Unmanaged AM still not successfully launched/registered yet."
|
||||
+ " Saving the allocate request and send later.");
|
||||
|
@ -328,6 +328,15 @@ public class UnmanagedApplicationManager {
|
|||
return this.applicationId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the rmProxy relayer of this UAM.
|
||||
*
|
||||
* @return rmProxy relayer of the UAM
|
||||
*/
|
||||
public AMRMClientRelayer getAMRMClientRelayer() {
|
||||
return this.rmProxyRelayer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns RM proxy for the specified protocol type. Unit test cases can
|
||||
* override this method and return mock proxy instances.
|
||||
|
@ -592,10 +601,7 @@ public class UnmanagedApplicationManager {
|
|||
}
|
||||
|
||||
request.setResponseId(lastResponseId);
|
||||
|
||||
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
|
||||
request, rmProxy, registerRequest, applicationId);
|
||||
|
||||
AllocateResponse response = rmProxyRelayer.allocate(request);
|
||||
if (response == null) {
|
||||
throw new YarnException("Null allocateResponse from allocate");
|
||||
}
|
||||
|
|
|
@ -245,8 +245,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
ApplicationAttemptId attemptId = getAppIdentifier();
|
||||
LOG.info("Registering application attempt: " + attemptId);
|
||||
|
||||
shouldReRegisterNext = false;
|
||||
|
||||
List<Container> containersFromPreviousAttempt = null;
|
||||
|
||||
synchronized (applicationContainerIdMap) {
|
||||
|
@ -260,7 +258,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
containersFromPreviousAttempt.add(Container.newInstance(containerId,
|
||||
null, null, null, null, null));
|
||||
}
|
||||
} else {
|
||||
} else if (!shouldReRegisterNext) {
|
||||
throw new InvalidApplicationMasterRequestException(
|
||||
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
|
||||
}
|
||||
|
@ -270,6 +268,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
}
|
||||
}
|
||||
|
||||
shouldReRegisterNext = false;
|
||||
|
||||
// Make sure we wait for certain test cases last in the method
|
||||
synchronized (syncObj) {
|
||||
syncObj.notifyAll();
|
||||
|
@ -333,13 +333,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
|
||||
validateRunning();
|
||||
|
||||
if (request.getAskList() != null && request.getAskList().size() > 0
|
||||
&& request.getReleaseList() != null
|
||||
&& request.getReleaseList().size() > 0) {
|
||||
Assert.fail("The mock RM implementation does not support receiving "
|
||||
+ "askList and releaseList in the same heartbeat");
|
||||
}
|
||||
|
||||
ApplicationAttemptId attemptId = getAppIdentifier();
|
||||
LOG.info("Allocate from application attempt: " + attemptId);
|
||||
|
||||
|
|
|
@ -62,14 +62,15 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||
|
@ -106,9 +107,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
public static final String NMSS_REG_RESPONSE_KEY =
|
||||
NMSS_CLASS_PREFIX + "registerResponse";
|
||||
|
||||
/*
|
||||
/**
|
||||
* When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
|
||||
* Registry. Otherwise if NM recovery is enabled, the UAM token are store in
|
||||
* Registry. Otherwise if NM recovery is enabled, the UAM token are stored in
|
||||
* local NMSS instead under this directory name.
|
||||
*/
|
||||
public static final String NMSS_SECONDARY_SC_PREFIX =
|
||||
|
@ -119,8 +120,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
* The home sub-cluster is the sub-cluster where the AM container is running
|
||||
* in.
|
||||
*/
|
||||
private ApplicationMasterProtocol homeRM;
|
||||
private AMRMClientRelayer homeRMRelayer;
|
||||
private SubClusterId homeSubClusterId;
|
||||
private volatile int lastHomeResponseId;
|
||||
|
||||
/**
|
||||
* A flag for work preserving NM restart. If we just recovered, we need to
|
||||
* generate an {@link ApplicationMasterNotRegisteredException} exception back
|
||||
* to AM (similar to what RM will do after its restart/fail-over) in its next
|
||||
* allocate to trigger AM re-register (which we will shield from RM and just
|
||||
* return our saved register response) and a full pending requests re-send, so
|
||||
* that all the {@link AMRMClientRelayer} will be re-populated with all
|
||||
* pending requests.
|
||||
*
|
||||
* TODO: When split-merge is not idempotent, this can lead to some
|
||||
* over-allocation without a full cancel to RM.
|
||||
*/
|
||||
private volatile boolean justRecovered;
|
||||
|
||||
/**
|
||||
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
|
||||
|
@ -134,6 +150,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
*/
|
||||
private UnmanagedAMPoolManager uamPool;
|
||||
|
||||
/**
|
||||
* The rmProxy relayers for secondary sub-clusters that keep track of all
|
||||
* pending requests.
|
||||
*/
|
||||
private Map<String, AMRMClientRelayer> secondaryRelayers;
|
||||
|
||||
/** Thread pool used for asynchronous operations. */
|
||||
private ExecutorService threadpool;
|
||||
|
||||
|
@ -186,8 +208,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
this.asyncResponseSink = new ConcurrentHashMap<>();
|
||||
this.threadpool = Executors.newCachedThreadPool();
|
||||
this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
|
||||
this.secondaryRelayers = new ConcurrentHashMap<>();
|
||||
this.amRegistrationRequest = null;
|
||||
this.amRegistrationResponse = null;
|
||||
this.lastHomeResponseId = Integer.MAX_VALUE;
|
||||
this.justRecovered = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -224,8 +249,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
|
||||
this.homeSubClusterId =
|
||||
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
||||
this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
|
||||
this.appOwner);
|
||||
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
|
||||
ApplicationMasterProtocol.class, this.appOwner));
|
||||
|
||||
this.federationFacade = FederationStateStoreFacade.getInstance();
|
||||
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
||||
|
@ -240,13 +265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
@Override
|
||||
public void recover(Map<String, byte[]> recoveredDataMap) {
|
||||
super.recover(recoveredDataMap);
|
||||
LOG.info("Recovering data for FederationInterceptor");
|
||||
ApplicationAttemptId attemptId =
|
||||
getApplicationContext().getApplicationAttemptId();
|
||||
LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
|
||||
if (recoveredDataMap == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ApplicationAttemptId attemptId =
|
||||
getApplicationContext().getApplicationAttemptId();
|
||||
try {
|
||||
if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
|
||||
RegisterApplicationMasterRequestProto pb =
|
||||
|
@ -255,6 +279,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
this.amRegistrationRequest =
|
||||
new RegisterApplicationMasterRequestPBImpl(pb);
|
||||
LOG.info("amRegistrationRequest recovered for {}", attemptId);
|
||||
|
||||
// Give the register request to homeRMRelayer for future re-registration
|
||||
this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
|
||||
}
|
||||
if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
|
||||
RegisterApplicationMasterResponseProto pb =
|
||||
|
@ -263,6 +290,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
this.amRegistrationResponse =
|
||||
new RegisterApplicationMasterResponsePBImpl(pb);
|
||||
LOG.info("amRegistrationResponse recovered for {}", attemptId);
|
||||
// Trigger re-register and full pending re-send only if we have a
|
||||
// saved register response. This should always be true though.
|
||||
this.justRecovered = true;
|
||||
}
|
||||
|
||||
// Recover UAM amrmTokens from registry or NMSS
|
||||
|
@ -309,6 +339,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
|
||||
entry.getValue());
|
||||
|
||||
this.secondaryRelayers.put(subClusterId.getId(),
|
||||
this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
|
||||
|
||||
RegisterApplicationMasterResponse response =
|
||||
this.uamPool.registerApplicationMaster(subClusterId.getId(),
|
||||
this.amRegistrationRequest);
|
||||
|
@ -436,7 +469,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
* the other sub-cluster RM will be done lazily as needed later.
|
||||
*/
|
||||
this.amRegistrationResponse =
|
||||
this.homeRM.registerApplicationMaster(request);
|
||||
this.homeRMRelayer.registerApplicationMaster(request);
|
||||
if (this.amRegistrationResponse
|
||||
.getContainersFromPreviousAttempts() != null) {
|
||||
cacheAllocatedContainers(
|
||||
|
@ -495,6 +528,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
Preconditions.checkArgument(this.policyInterpreter != null,
|
||||
"Allocate should be called after registerApplicationMaster");
|
||||
|
||||
if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
|
||||
// Save the responseId home RM is expecting
|
||||
this.lastHomeResponseId = request.getResponseId();
|
||||
|
||||
throw new ApplicationMasterNotRegisteredException(
|
||||
"AMRMProxy just restarted and recovered for "
|
||||
+ getApplicationContext().getApplicationAttemptId()
|
||||
+ ". AM should re-register and full re-send pending requests.");
|
||||
}
|
||||
|
||||
// Override responseId in the request in two cases:
|
||||
//
|
||||
// 1. After we just recovered after an NM restart and AM's responseId is
|
||||
// reset due to the exception we generate. We need to override the
|
||||
// responseId to the one homeRM expects.
|
||||
//
|
||||
// 2. After homeRM fail-over, the allocate response with reseted responseId
|
||||
// might not be returned successfully back to AM because of RPC connection
|
||||
// timeout between AM and AMRMProxy. In this case, we remember and reset the
|
||||
// responseId for AM.
|
||||
if (this.justRecovered
|
||||
|| request.getResponseId() > this.lastHomeResponseId) {
|
||||
LOG.warn("Setting allocate responseId for {} from {} to {}",
|
||||
getApplicationContext().getApplicationAttemptId(),
|
||||
request.getResponseId(), this.lastHomeResponseId);
|
||||
request.setResponseId(this.lastHomeResponseId);
|
||||
}
|
||||
|
||||
try {
|
||||
// Split the heart beat request into multiple requests, one for each
|
||||
// sub-cluster RM that is used by this application.
|
||||
|
@ -509,10 +570,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
sendRequestsToSecondaryResourceManagers(requests);
|
||||
|
||||
// Send the request to the home RM and get the response
|
||||
AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
|
||||
requests.get(this.homeSubClusterId), this.homeRM,
|
||||
this.amRegistrationRequest,
|
||||
getApplicationContext().getApplicationAttemptId().getApplicationId());
|
||||
AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
|
||||
LOG.info("{} heartbeating to home RM with responseId {}",
|
||||
getApplicationContext().getApplicationAttemptId(),
|
||||
homeRequest.getResponseId());
|
||||
|
||||
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
|
||||
|
||||
// Reset the flag after the first successful homeRM allocate response,
|
||||
// otherwise keep overriding the responseId of new allocate request
|
||||
if (this.justRecovered) {
|
||||
this.justRecovered = false;
|
||||
}
|
||||
|
||||
// Notify policy of home response
|
||||
try {
|
||||
|
@ -540,6 +609,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
newRegistrations.getSuccessfulRegistrations());
|
||||
}
|
||||
|
||||
LOG.info("{} heartbeat response from home RM with responseId {}",
|
||||
getApplicationContext().getApplicationAttemptId(),
|
||||
homeResponse.getResponseId());
|
||||
|
||||
// Update lastHomeResponseId in three cases:
|
||||
// 1. The normal responseId increments
|
||||
// 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
|
||||
// over, AMRMClientRelayer auto re-register and full re-send for homeRM.
|
||||
// 3. lastHomeResponseId == MAX_INT. This is the initial case or
|
||||
// responseId about to overflow and wrap around
|
||||
if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
|
||||
|| homeResponse.getResponseId() == 1
|
||||
|| this.lastHomeResponseId == Integer.MAX_VALUE) {
|
||||
this.lastHomeResponseId = homeResponse.getResponseId();
|
||||
}
|
||||
|
||||
// return the final response to the application master.
|
||||
return homeResponse;
|
||||
} catch (IOException ex) {
|
||||
|
@ -584,6 +669,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
try {
|
||||
uamResponse =
|
||||
uamPool.finishApplicationMaster(subClusterId, finishRequest);
|
||||
|
||||
if (uamResponse.getIsUnregistered()) {
|
||||
secondaryRelayers.remove(subClusterId);
|
||||
|
||||
if (getNMStateStore() != null) {
|
||||
getNMStateStore().removeAMRMProxyAppContextEntry(
|
||||
getApplicationContext().getApplicationAttemptId(),
|
||||
NMSS_SECONDARY_SC_PREFIX + subClusterId);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to finish unmanaged application master: "
|
||||
+ "RM address: " + subClusterId + " ApplicationId: "
|
||||
|
@ -600,9 +695,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
// asynchronously by other sub-cluster resource managers, send the same
|
||||
// request to the home resource manager on this thread.
|
||||
FinishApplicationMasterResponse homeResponse =
|
||||
AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
|
||||
this.amRegistrationRequest, getApplicationContext()
|
||||
.getApplicationAttemptId().getApplicationId());
|
||||
this.homeRMRelayer.finishApplicationMaster(request);
|
||||
|
||||
if (subClusterIds.size() > 0) {
|
||||
// Wait for other sub-cluster resource managers to return the
|
||||
|
@ -621,10 +714,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
if (uamResponse.getResponse() == null
|
||||
|| !uamResponse.getResponse().getIsUnregistered()) {
|
||||
failedToUnRegister = true;
|
||||
} else if (getNMStateStore() != null) {
|
||||
getNMStateStore().removeAMRMProxyAppContextEntry(
|
||||
getApplicationContext().getApplicationAttemptId(),
|
||||
NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
failedToUnRegister = true;
|
||||
|
@ -689,6 +778,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
return this.registryClient;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int getLastHomeResponseId() {
|
||||
return this.lastHomeResponseId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
|
||||
* override.
|
||||
|
@ -800,6 +894,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
getApplicationContext().getUser(), homeSubClusterId.getId(),
|
||||
amrmToken);
|
||||
|
||||
secondaryRelayers.put(subClusterId.getId(),
|
||||
uamPool.getAMRMClientRelayer(subClusterId.getId()));
|
||||
|
||||
response = uamPool.registerApplicationMaster(
|
||||
subClusterId.getId(), amRegistrationRequest);
|
||||
|
||||
|
@ -1098,7 +1195,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
token = uamPool.launchUAM(subClusterId, config,
|
||||
appContext.getApplicationAttemptId().getApplicationId(),
|
||||
amRegistrationResponse.getQueue(), appContext.getUser(),
|
||||
homeSubClusterId.toString(), registryClient != null);
|
||||
homeSubClusterId.toString(), true);
|
||||
|
||||
secondaryRelayers.put(subClusterId,
|
||||
uamPool.getAMRMClientRelayer(subClusterId));
|
||||
|
||||
uamResponse = uamPool.registerApplicationMaster(subClusterId,
|
||||
registerRequest);
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
|
@ -533,6 +534,7 @@ public abstract class BaseAMRMProxyTest {
|
|||
capability.setMemorySize(memory);
|
||||
capability.setVirtualCores(vCores);
|
||||
req.setCapability(capability);
|
||||
req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance());
|
||||
if (labelExpression != null) {
|
||||
req.setNodeLabelExpression(labelExpression);
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||
|
@ -516,6 +517,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
|
|||
interceptor.recover(recoveredDataMap);
|
||||
|
||||
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
|
||||
Assert.assertEquals(Integer.MAX_VALUE,
|
||||
interceptor.getLastHomeResponseId());
|
||||
|
||||
// The first allocate call expects a fail-over exception and re-register
|
||||
int responseId = 10;
|
||||
AllocateRequest allocateRequest =
|
||||
Records.newRecord(AllocateRequest.class);
|
||||
allocateRequest.setResponseId(responseId);
|
||||
try {
|
||||
interceptor.allocate(allocateRequest);
|
||||
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
|
||||
+ " after FederationInterceptor restarts and recovers");
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
}
|
||||
interceptor.registerApplicationMaster(registerReq);
|
||||
Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
|
||||
|
||||
// Release all containers
|
||||
releaseContainersAndAssert(containers);
|
||||
|
|
Loading…
Reference in New Issue