From aab9bfc13c3d88e3b065e2f3e471b3e0fb0afb28 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Mon, 9 Jul 2018 16:47:44 -0700 Subject: [PATCH] YARN-7899. [AMRMProxy] Stateful FederationInterceptor for pending requests. Contributed by Botong Huang. --- .../hadoop/yarn/client/AMRMClientUtils.java | 91 ----------- .../hadoop/yarn/server/AMRMClientRelayer.java | 9 +- .../server/uam/UnmanagedAMPoolManager.java | 16 ++ .../uam/UnmanagedApplicationManager.java | 40 +++-- .../server/MockResourceManagerFacade.java | 13 +- .../amrmproxy/FederationInterceptor.java | 146 +++++++++++++++--- .../amrmproxy/BaseAMRMProxyTest.java | 2 + .../amrmproxy/TestFederationInterceptor.java | 17 ++ 8 files changed, 192 insertions(+), 142 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 7f6248599be..06948bb8d6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -30,17 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,87 +49,6 @@ public final class AMRMClientUtils { private AMRMClientUtils() { } - /** - * Handle ApplicationNotRegistered exception and re-register. - * - * @param appId application Id - * @param rmProxy RM proxy instance - * @param registerRequest the AM re-register request - * @throws YarnException if re-register fails - */ - public static void handleNotRegisteredExceptionAndReRegister( - ApplicationId appId, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest) throws YarnException { - LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", appId); - try { - rmProxy.registerApplicationMaster(registerRequest); - } catch (Exception e) { - if (e instanceof InvalidApplicationMasterRequestException - && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) { - LOG.info("Concurrent thread successfully registered, moving on."); - } else { - LOG.error("Error trying to re-register AM", e); - throw new YarnException(e); - } - } - } - - /** - * Helper method for client calling ApplicationMasterProtocol.allocate that - * handles re-register if RM fails over. - * - * @param request allocate request - * @param rmProxy RM proxy - * @param registerRequest the register request for re-register - * @param appId application id - * @return allocate response - * @throws YarnException if RM call fails - * @throws IOException if RM call fails - */ - public static AllocateResponse allocateWithReRegister(AllocateRequest request, - ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, ApplicationId appId) - throws YarnException, IOException { - try { - return rmProxy.allocate(request); - } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(appId, rmProxy, - registerRequest); - // reset responseId after re-register - request.setResponseId(0); - // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, appId); - } - } - - /** - * Helper method for client calling - * ApplicationMasterProtocol.finishApplicationMaster that handles re-register - * if RM fails over. - * - * @param request finishApplicationMaster request - * @param rmProxy RM proxy - * @param registerRequest the register request for re-register - * @param appId application id - * @return finishApplicationMaster response - * @throws YarnException if RM call fails - * @throws IOException if RM call fails - */ - public static FinishApplicationMasterResponse finishAMWithReRegister( - FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, ApplicationId appId) - throws YarnException, IOException { - try { - return rmProxy.finishApplicationMaster(request); - } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(appId, rmProxy, - registerRequest); - // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, appId); - } - } - /** * Create a proxy for the specified protocol. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index 2e7c184b9fa..21cb55f7b89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -141,6 +141,11 @@ public class AMRMClientRelayer extends AbstractService super.serviceStop(); } + public void setAMRegistrationRequest( + RegisterApplicationMasterRequest registerRequest) { + this.amRegistrationRequest = registerRequest; + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) @@ -239,8 +244,10 @@ public class AMRMClientRelayer extends AbstractService this.change.putAll(this.remotePendingChange); } - // re register with RM, then retry allocate recursively + // re-register with RM, then retry allocate recursively registerApplicationMaster(this.amRegistrationRequest); + // Reset responseId after re-register + allocateRequest.setResponseId(0); return allocate(allocateRequest); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 4d679ef3606..eea163f0771 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -384,4 +385,19 @@ public class UnmanagedAMPoolManager extends AbstractService { return this.unmanagedAppMasterMap.containsKey(uamId); } + /** + * Return the rmProxy relayer of an UAM. + * + * @param uamId uam Id + * @return the rmProxy relayer + * @throws YarnException if fails + */ + public AMRMClientRelayer getAMRMClientRelayer(String uamId) + throws YarnException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 10985e0417b..52bde8c0503 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; @@ -90,7 +91,7 @@ public class UnmanagedApplicationManager { private BlockingQueue requestQueue; private AMRequestHandlerThread handlerThread; - private ApplicationMasterProtocol rmProxy; + private AMRMClientRelayer rmProxyRelayer; private ApplicationId applicationId; private String submitter; private String appNameSuffix; @@ -138,7 +139,7 @@ public class UnmanagedApplicationManager { this.appNameSuffix = appNameSuffix; this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); - this.rmProxy = null; + this.rmProxyRelayer = null; this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); @@ -190,8 +191,9 @@ public class UnmanagedApplicationManager { throws IOException { this.userUgi = UserGroupInformation.createProxyUser( this.applicationId.toString(), UserGroupInformation.getCurrentUser()); - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, amrmToken); + this.rmProxyRelayer = + new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, + this.conf, this.userUgi, amrmToken)); } /** @@ -209,19 +211,18 @@ public class UnmanagedApplicationManager { // Save the register request for re-register later this.registerRequest = request; - // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. - // We do not expect application already registered exception here LOG.info("Registering the Unmanaged application master {}", this.applicationId); RegisterApplicationMasterResponse response = - this.rmProxy.registerApplicationMaster(this.registerRequest); + this.rmProxyRelayer.registerApplicationMaster(this.registerRequest); + this.lastResponseId = 0; for (Container container : response.getContainersFromPreviousAttempts()) { - LOG.info("RegisterUAM returned existing running container " + LOG.debug("RegisterUAM returned existing running container " + container.getId()); } for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { - LOG.info("RegisterUAM returned existing NM token for node " + LOG.debug("RegisterUAM returned existing NM token for node " + nmToken.getNodeId()); } @@ -249,7 +250,7 @@ public class UnmanagedApplicationManager { this.handlerThread.shutdown(); - if (this.rmProxy == null) { + if (this.rmProxyRelayer == null) { if (this.connectionInitiated) { // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. @@ -261,8 +262,7 @@ public class UnmanagedApplicationManager { + "be called before createAndRegister"); } } - return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.applicationId); + return this.rmProxyRelayer.finishApplicationMaster(request); } /** @@ -308,7 +308,7 @@ public class UnmanagedApplicationManager { // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. - if (this.rmProxy == null) { + if (this.rmProxyRelayer == null) { if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); @@ -328,6 +328,15 @@ public class UnmanagedApplicationManager { return this.applicationId; } + /** + * Returns the rmProxy relayer of this UAM. + * + * @return rmProxy relayer of the UAM + */ + public AMRMClientRelayer getAMRMClientRelayer() { + return this.rmProxyRelayer; + } + /** * Returns RM proxy for the specified protocol type. Unit test cases can * override this method and return mock proxy instances. @@ -592,10 +601,7 @@ public class UnmanagedApplicationManager { } request.setResponseId(lastResponseId); - - AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, applicationId); - + AllocateResponse response = rmProxyRelayer.allocate(request); if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index f8b96c39ea4..97e38664ca5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -245,8 +245,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Registering application attempt: " + attemptId); - shouldReRegisterNext = false; - List containersFromPreviousAttempt = null; synchronized (applicationContainerIdMap) { @@ -260,7 +258,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, containersFromPreviousAttempt.add(Container.newInstance(containerId, null, null, null, null, null)); } - } else { + } else if (!shouldReRegisterNext) { throw new InvalidApplicationMasterRequestException( AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); } @@ -270,6 +268,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } } + shouldReRegisterNext = false; + // Make sure we wait for certain test cases last in the method synchronized (syncObj) { syncObj.notifyAll(); @@ -333,13 +333,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - if (request.getAskList() != null && request.getAskList().size() > 0 - && request.getReleaseList() != null - && request.getReleaseList().size() > 0) { - Assert.fail("The mock RM implementation does not support receiving " - + "askList and releaseList in the same heartbeat"); - } - ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Allocate from application attempt: " + attemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 57407490c4c..645e47e5af0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -62,14 +62,15 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; @@ -106,9 +107,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public static final String NMSS_REG_RESPONSE_KEY = NMSS_CLASS_PREFIX + "registerResponse"; - /* + /** * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn - * Registry. Otherwise if NM recovery is enabled, the UAM token are store in + * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in * local NMSS instead under this directory name. */ public static final String NMSS_SECONDARY_SC_PREFIX = @@ -119,8 +120,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * The home sub-cluster is the sub-cluster where the AM container is running * in. */ - private ApplicationMasterProtocol homeRM; + private AMRMClientRelayer homeRMRelayer; private SubClusterId homeSubClusterId; + private volatile int lastHomeResponseId; + + /** + * A flag for work preserving NM restart. If we just recovered, we need to + * generate an {@link ApplicationMasterNotRegisteredException} exception back + * to AM (similar to what RM will do after its restart/fail-over) in its next + * allocate to trigger AM re-register (which we will shield from RM and just + * return our saved register response) and a full pending requests re-send, so + * that all the {@link AMRMClientRelayer} will be re-populated with all + * pending requests. + * + * TODO: When split-merge is not idempotent, this can lead to some + * over-allocation without a full cancel to RM. + */ + private volatile boolean justRecovered; /** * UAM pool for secondary sub-clusters (ones other than home sub-cluster), @@ -134,6 +150,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private UnmanagedAMPoolManager uamPool; + /** + * The rmProxy relayers for secondary sub-clusters that keep track of all + * pending requests. + */ + private Map secondaryRelayers; + /** Thread pool used for asynchronous operations. */ private ExecutorService threadpool; @@ -186,8 +208,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.asyncResponseSink = new ConcurrentHashMap<>(); this.threadpool = Executors.newCachedThreadPool(); this.uamPool = createUnmanagedAMPoolManager(this.threadpool); + this.secondaryRelayers = new ConcurrentHashMap<>(); this.amRegistrationRequest = null; this.amRegistrationResponse = null; + this.lastHomeResponseId = Integer.MAX_VALUE; + this.justRecovered = false; } /** @@ -224,8 +249,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class, - this.appOwner); + this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, + ApplicationMasterProtocol.class, this.appOwner)); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -240,13 +265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @Override public void recover(Map recoveredDataMap) { super.recover(recoveredDataMap); - LOG.info("Recovering data for FederationInterceptor"); + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + LOG.info("Recovering data for FederationInterceptor for {}", attemptId); if (recoveredDataMap == null) { return; } - - ApplicationAttemptId attemptId = - getApplicationContext().getApplicationAttemptId(); try { if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { RegisterApplicationMasterRequestProto pb = @@ -255,6 +279,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb); LOG.info("amRegistrationRequest recovered for {}", attemptId); + + // Give the register request to homeRMRelayer for future re-registration + this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest); } if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) { RegisterApplicationMasterResponseProto pb = @@ -263,6 +290,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb); LOG.info("amRegistrationResponse recovered for {}", attemptId); + // Trigger re-register and full pending re-send only if we have a + // saved register response. This should always be true though. + this.justRecovered = true; } // Recover UAM amrmTokens from registry or NMSS @@ -309,6 +339,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { getApplicationContext().getUser(), this.homeSubClusterId.getId(), entry.getValue()); + this.secondaryRelayers.put(subClusterId.getId(), + this.uamPool.getAMRMClientRelayer(subClusterId.getId())); + RegisterApplicationMasterResponse response = this.uamPool.registerApplicationMaster(subClusterId.getId(), this.amRegistrationRequest); @@ -436,7 +469,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * the other sub-cluster RM will be done lazily as needed later. */ this.amRegistrationResponse = - this.homeRM.registerApplicationMaster(request); + this.homeRMRelayer.registerApplicationMaster(request); if (this.amRegistrationResponse .getContainersFromPreviousAttempts() != null) { cacheAllocatedContainers( @@ -495,6 +528,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Preconditions.checkArgument(this.policyInterpreter != null, "Allocate should be called after registerApplicationMaster"); + if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) { + // Save the responseId home RM is expecting + this.lastHomeResponseId = request.getResponseId(); + + throw new ApplicationMasterNotRegisteredException( + "AMRMProxy just restarted and recovered for " + + getApplicationContext().getApplicationAttemptId() + + ". AM should re-register and full re-send pending requests."); + } + + // Override responseId in the request in two cases: + // + // 1. After we just recovered after an NM restart and AM's responseId is + // reset due to the exception we generate. We need to override the + // responseId to the one homeRM expects. + // + // 2. After homeRM fail-over, the allocate response with reseted responseId + // might not be returned successfully back to AM because of RPC connection + // timeout between AM and AMRMProxy. In this case, we remember and reset the + // responseId for AM. + if (this.justRecovered + || request.getResponseId() > this.lastHomeResponseId) { + LOG.warn("Setting allocate responseId for {} from {} to {}", + getApplicationContext().getApplicationAttemptId(), + request.getResponseId(), this.lastHomeResponseId); + request.setResponseId(this.lastHomeResponseId); + } + try { // Split the heart beat request into multiple requests, one for each // sub-cluster RM that is used by this application. @@ -509,10 +570,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor { sendRequestsToSecondaryResourceManagers(requests); // Send the request to the home RM and get the response - AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( - requests.get(this.homeSubClusterId), this.homeRM, - this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId().getApplicationId()); + AllocateRequest homeRequest = requests.get(this.homeSubClusterId); + LOG.info("{} heartbeating to home RM with responseId {}", + getApplicationContext().getApplicationAttemptId(), + homeRequest.getResponseId()); + + AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest); + + // Reset the flag after the first successful homeRM allocate response, + // otherwise keep overriding the responseId of new allocate request + if (this.justRecovered) { + this.justRecovered = false; + } // Notify policy of home response try { @@ -540,6 +609,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { newRegistrations.getSuccessfulRegistrations()); } + LOG.info("{} heartbeat response from home RM with responseId {}", + getApplicationContext().getApplicationAttemptId(), + homeResponse.getResponseId()); + + // Update lastHomeResponseId in three cases: + // 1. The normal responseId increments + // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails + // over, AMRMClientRelayer auto re-register and full re-send for homeRM. + // 3. lastHomeResponseId == MAX_INT. This is the initial case or + // responseId about to overflow and wrap around + if (homeResponse.getResponseId() == this.lastHomeResponseId + 1 + || homeResponse.getResponseId() == 1 + || this.lastHomeResponseId == Integer.MAX_VALUE) { + this.lastHomeResponseId = homeResponse.getResponseId(); + } + // return the final response to the application master. return homeResponse; } catch (IOException ex) { @@ -584,6 +669,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { try { uamResponse = uamPool.finishApplicationMaster(subClusterId, finishRequest); + + if (uamResponse.getIsUnregistered()) { + secondaryRelayers.remove(subClusterId); + + if (getNMStateStore() != null) { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } + } } catch (Throwable e) { LOG.warn("Failed to finish unmanaged application master: " + "RM address: " + subClusterId + " ApplicationId: " @@ -600,9 +695,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // asynchronously by other sub-cluster resource managers, send the same // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = - AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, - this.amRegistrationRequest, getApplicationContext() - .getApplicationAttemptId().getApplicationId()); + this.homeRMRelayer.finishApplicationMaster(request); if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the @@ -621,10 +714,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (uamResponse.getResponse() == null || !uamResponse.getResponse().getIsUnregistered()) { failedToUnRegister = true; - } else if (getNMStateStore() != null) { - getNMStateStore().removeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), - NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId()); } } catch (Throwable e) { failedToUnRegister = true; @@ -689,6 +778,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return this.registryClient; } + @VisibleForTesting + protected int getLastHomeResponseId() { + return this.lastHomeResponseId; + } + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. @@ -800,6 +894,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { getApplicationContext().getUser(), homeSubClusterId.getId(), amrmToken); + secondaryRelayers.put(subClusterId.getId(), + uamPool.getAMRMClientRelayer(subClusterId.getId())); + response = uamPool.registerApplicationMaster( subClusterId.getId(), amRegistrationRequest); @@ -1098,7 +1195,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { token = uamPool.launchUAM(subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString(), registryClient != null); + homeSubClusterId.toString(), true); + + secondaryRelayers.put(subClusterId, + uamPool.getAMRMClientRelayer(subClusterId)); uamResponse = uamPool.registerApplicationMaster(subClusterId, registerRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 0958191d436..3c432d30338 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -533,6 +534,7 @@ public abstract class BaseAMRMProxyTest { capability.setMemorySize(memory); capability.setVirtualCores(vCores); req.setCapability(capability); + req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance()); if (labelExpression != null) { req.setNodeLabelExpression(labelExpression); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 5a1a657a34e..9950b92d6ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; @@ -516,6 +517,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(Integer.MAX_VALUE, + interceptor.getLastHomeResponseId()); + + // The first allocate call expects a fail-over exception and re-register + int responseId = 10; + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(responseId); + try { + interceptor.allocate(allocateRequest); + Assert.fail("Expecting an ApplicationMasterNotRegisteredException " + + " after FederationInterceptor restarts and recovers"); + } catch (ApplicationMasterNotRegisteredException e) { + } + interceptor.registerApplicationMaster(registerReq); + Assert.assertEquals(responseId, interceptor.getLastHomeResponseId()); // Release all containers releaseContainersAndAssert(containers);