From 57caab6f9b66d52d6df2d95e21e8854398489631 Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Mon, 1 Oct 2018 13:12:38 -0700 Subject: [PATCH] YARN-8760. [AMRMProxy] Fix concurrent re-register due to YarnRM failover in AMRMClientRelayer. Contributed by Botong Huang. --- .../hadoop/yarn/server/AMRMClientRelayer.java | 25 +++++++++++++++++-- .../yarn/server/TestAMRMClientRelayer.java | 25 +++++++++++++++++++ .../amrmproxy/FederationInterceptor.java | 12 +++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index a7ed373e98e..790147c5987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -232,6 +232,27 @@ public class AMRMClientRelayer extends AbstractService return this.rmClient.registerApplicationMaster(request); } + /** + * After an RM failover, there might be more than one + * allocate/finishApplicationMaster call thread (due to RPC timeout and retry) + * doing the auto re-register concurrently. As a result, we need to swallow + * the already register exception thrown by the new RM. + */ + private void reRegisterApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + try { + registerApplicationMaster(request); + } catch (InvalidApplicationMasterRequestException e) { + if (e.getMessage() + .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) { + LOG.info("Concurrent thread successfully re-registered, moving on."); + } else { + throw e; + } + } + } + @Override public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) @@ -242,7 +263,7 @@ public class AMRMClientRelayer extends AbstractService LOG.warn("Out of sync with RM " + rmId + " for " + this.appId + ", hence resyncing."); // re register with RM - registerApplicationMaster(this.amRegistrationRequest); + reRegisterApplicationMaster(this.amRegistrationRequest); return finishApplicationMaster(request); } } @@ -363,7 +384,7 @@ public class AMRMClientRelayer extends AbstractService } // re-register with RM, then retry allocate recursively - registerApplicationMaster(this.amRegistrationRequest); + reRegisterApplicationMaster(this.amRegistrationRequest); // Reset responseId after re-register allocateRequest.setResponseId(0); allocateResponse = allocate(allocateRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java index 2c016d769fe..fa46960ee7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java @@ -64,6 +64,11 @@ public class TestAMRMClientRelayer { // Whether this mockRM will throw failover exception upon next heartbeat // from AM private boolean failover = false; + + // Whether this mockRM will throw application already registered exception + // upon next registerApplicationMaster call + private boolean throwAlreadyRegister = false; + private int responseIdReset = -1; private List lastAsk; private List lastRelease; @@ -74,6 +79,11 @@ public class TestAMRMClientRelayer { public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { + if (this.throwAlreadyRegister) { + this.throwAlreadyRegister = false; + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + "appId"); + } return null; } @@ -118,6 +128,10 @@ public class TestAMRMClientRelayer { this.failover = true; } + public void setThrowAlreadyRegister() { + this.throwAlreadyRegister = true; + } + public void setResponseIdReset(int expectedResponseId) { this.responseIdReset = expectedResponseId; } @@ -315,4 +329,15 @@ public class TestAMRMClientRelayer { response = this.relayer.allocate(getAllocateRequest()); Assert.assertEquals(this.responseId + 1, response.getResponseId()); } + + @Test + public void testConcurrentReregister() throws YarnException, IOException { + + // Set RM restart and failover flag + this.mockAMS.setFailoverFlag(); + + this.mockAMS.setThrowAlreadyRegister(); + + relayer.finishApplicationMaster(null); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index c02296de1fa..426794580d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -184,6 +184,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private volatile boolean justRecovered; + /** if true, allocate will be no-op, skipping actual processing. */ + private volatile boolean finishAMCalled; + /** * Used to keep track of the container Id and the sub cluster RM that created * the container, so that we know which sub-cluster to forward later requests @@ -230,6 +233,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationRequest = null; this.amRegistrationResponse = null; this.justRecovered = false; + this.finishAMCalled = false; } /** @@ -576,6 +580,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + ". AM should re-register and full re-send pending requests."); } + if (this.finishAMCalled) { + LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat " + + "processing and return dummy response" + this.attemptId); + return RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + } + // Check responseId and handle duplicate heartbeat exactly same as RM synchronized (this.lastAllocateResponseLock) { LOG.info("Heartbeat from " + this.attemptId + " with responseId " @@ -664,6 +674,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { FinishApplicationMasterRequest request) throws YarnException, IOException { + this.finishAMCalled = true; + // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager boolean failedToUnRegister = false; ExecutorCompletionService compSvc =