From 4d03915be9221ce2348a52bce44fcd5ab305df63 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 17 Oct 2014 16:35:13 -0700 Subject: [PATCH] YARN-1879. Marked Idempotent/AtMostOnce annotations to ApplicationMasterProtocol for RM fail over. Contributed by Tsuyoshi OZAWA (cherry picked from commit c3de2412eb7633ff16c67e71e73bbe27a982d984) --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/ApplicationMasterProtocol.java | 3 + .../yarn/client/ProtocolHATestBase.java | 64 ++++++++++++- .../TestApplicationMasterServiceOnHA.java | 92 ------------------- .../ApplicationMasterService.java | 16 ++-- .../yarn/server/resourcemanager/MockAM.java | 60 +++++++----- .../TestApplicationMasterService.java | 32 +++---- .../TestWorkPreservingRMRestart.java | 46 ++++++++++ 8 files changed, 178 insertions(+), 138 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b1596293cba..fa3e24334c8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -294,6 +294,9 @@ Release 2.6.0 - UNRELEASED YARN-2621. Simplify the output when the user doesn't have the access for getDomain(s). (Zhijie Shen via jianhe) + YARN-1879. Marked Idempotent/AtMostOnce annotations to ApplicationMasterProtocol + for RM fail over. (Tsuyoshi OZAWA via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java index f4ae9bef954..4d7896115e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -80,6 +81,7 @@ public interface ApplicationMasterProtocol { */ @Public @Stable + @Idempotent public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException; @@ -104,6 +106,7 @@ public interface ApplicationMasterProtocol { */ @Public @Stable + @AtMostOnce public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index a0f3ceedc16..1cda9d5b2a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.client; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -122,7 +130,23 @@ import org.junit.After; import org.junit.Before; -public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ +/** + * Test Base for ResourceManager's Protocol on HA. + * + * Limited scope: + * For all the test cases, we only test whether the method will be re-entered + * when failover happens. Does not cover the entire logic of test. + * + * Test strategy: + * Create a separate failover thread with a trigger flag, + * override all APIs that are added trigger flag. + * When the APIs are called, we will set trigger flag as true to kick off + * the failover. So We can make sure the failover happens during process + * of the method. If this API is marked as @Idempotent or @AtMostOnce, + * the test cases will pass; otherwise, they will throw the exception. + * + */ +public abstract class ProtocolHATestBase extends ClientBaseWithFixes { protected static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER); @@ -760,6 +784,43 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ return createFakeAllocateResponse(); } + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeRegisterApplicationMasterResponse(); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + resetStartFailoverFlag(true); + // make sure failover has been triggered + Assert.assertTrue(waittingForFailOver()); + return createFakeFinishApplicationMasterResponse(); + } + } + + public RegisterApplicationMasterResponse + createFakeRegisterApplicationMasterResponse() { + Resource minCapability = Resource.newInstance(2048, 2); + Resource maxCapability = Resource.newInstance(4096, 4); + Map acls = + new HashMap(); + acls.put(ApplicationAccessType.MODIFY_APP, "*"); + ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes()); + return RegisterApplicationMasterResponse.newInstance(minCapability, + maxCapability, acls, key, new ArrayList(), "root", + new ArrayList()); + } + + public FinishApplicationMasterResponse + createFakeFinishApplicationMasterResponse() { + return FinishApplicationMasterResponse.newInstance(true); } public AllocateResponse createFakeAllocateResponse() { @@ -770,4 +831,5 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{ null, new ArrayList()); } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java deleted file mode 100644 index 5b12940ce59..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java +++ /dev/null @@ -1,92 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.client; - -import java.io.IOException; -import java.util.ArrayList; - -import org.junit.Assert; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - - -public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ - private ApplicationMasterProtocol amClient; - private ApplicationAttemptId attemptId ; - RMAppAttempt appAttempt; - - @Before - public void initiate() throws Exception { - startHACluster(0, false, false, true); - attemptId = this.cluster.createFakeApplicationAttemptId(); - amClient = ClientRMProxy - .createRMProxy(this.conf, ApplicationMasterProtocol.class); - - Token appToken = - this.cluster.getResourceManager().getRMContext() - .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(ClientRMProxy.getAMRMTokenService(conf)); - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser() - .getUserName())); - UserGroupInformation.getCurrentUser().addToken(appToken); - syncToken(appToken); - } - - @After - public void shutDown() { - if(this.amClient != null) { - RPC.stopProxy(this.amClient); - } - } - - @Test(timeout = 15000) - public void testAllocateOnHA() throws YarnException, IOException { - AllocateRequest request = AllocateRequest.newInstance(0, 50f, - new ArrayList(), - new ArrayList(), - ResourceBlacklistRequest.newInstance(new ArrayList(), - new ArrayList())); - AllocateResponse response = amClient.allocate(request); - Assert.assertEquals(response, this.cluster.createFakeAllocateResponse()); - } - - private void syncToken(Token token) throws IOException { - for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) { - this.cluster.getResourceManager(i).getRMContext() - .getAMRMTokenSecretManager().addPersistedPassword(token); - } - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 35baa440cfc..6a00a249e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -343,6 +343,15 @@ public class ApplicationMasterService extends AbstractService implements authorizeRequest().getApplicationAttemptId(); ApplicationId appId = applicationAttemptId.getApplicationId(); + RMApp rmApp = + rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + // checking whether the app exits in RMStateStore at first not to throw + // ApplicationDoesNotExistInCacheException before and after + // RM work-preserving restart. + if (rmApp.isAppFinalStateStored()) { + return FinishApplicationMasterResponse.newInstance(true); + } + AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { throwApplicationDoesNotExistInCacheException(applicationAttemptId); @@ -366,13 +375,6 @@ public class ApplicationMasterService extends AbstractService implements this.amLivelinessMonitor.receivedPing(applicationAttemptId); - RMApp rmApp = - rmContext.getRMApps().get(appId); - - if (rmApp.isAppFinalStateStored()) { - return FinishApplicationMasterResponse.newInstance(true); - } - rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptUnregistrationEvent(applicationAttemptId, request .getTrackingUrl(), request.getFinalApplicationStatus(), request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 7af9966735d..e2e3cc13da9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -51,6 +52,7 @@ public class MockAM { private final ApplicationAttemptId attemptId; private RMContext context; private ApplicationMasterProtocol amRMProtocol; + private UserGroupInformation ugi; private final List requests = new ArrayList(); private final List releases = new ArrayList(); @@ -101,15 +103,18 @@ public class MockAM { req.setHost(""); req.setRpcPort(1); req.setTrackingUrl(""); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(attemptId.toString()); - Token token = - context.getRMApps().get(attemptId.getApplicationId()) - .getRMAppAttempt(attemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); + if (ugi == null) { + ugi = UserGroupInformation.createRemoteUser( + attemptId.toString()); + Token token = + context.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + } try { return ugi - .doAs(new PrivilegedExceptionAction() { + .doAs( + new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws Exception { return amRMProtocol.registerApplicationMaster(req); @@ -245,10 +250,15 @@ public class MockAM { public void unregisterAppAttempt() throws Exception { waitForState(RMAppAttemptState.RUNNING); + unregisterAppAttempt(true); + } + + public void unregisterAppAttempt(boolean waitForStateRunning) + throws Exception { final FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( - FinalApplicationStatus.SUCCEEDED, "", ""); - unregisterAppAttempt(req,true); + FinalApplicationStatus.SUCCEEDED, "", ""); + unregisterAppAttempt(req, waitForStateRunning); } public void unregisterAppAttempt(final FinishApplicationMasterRequest req, @@ -256,19 +266,25 @@ public class MockAM { if (waitForStateRunning) { waitForState(RMAppAttemptState.RUNNING); } - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(attemptId.toString()); - Token token = - context.getRMApps().get(attemptId.getApplicationId()) - .getRMAppAttempt(attemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - amRMProtocol.finishApplicationMaster(req); - return null; - } - }); + if (ugi == null) { + ugi = UserGroupInformation.createRemoteUser(attemptId.toString()); + Token token = + context.getRMApps() + .get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + } + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + amRMProtocol.finishApplicationMaster(req); + return null; + } + }); + } catch (UndeclaredThrowableException e) { + throw (Exception) e.getCause(); + } } public ApplicationAttemptId getApplicationAttemptId() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index a0a50efe5cd..d1f0ede4a33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -23,9 +23,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,15 +40,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; - import org.apache.hadoop.yarn.server.utils.BuilderUtils; + import org.junit.BeforeClass; import org.junit.Test; +import org.junit.Assert; -import java.util.*; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static java.lang.Thread.sleep; -import static org.mockito.Matchers.any; public class TestApplicationMasterService { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -240,25 +242,23 @@ public class TestApplicationMasterService { FinishApplicationMasterRequest req = FinishApplicationMasterRequest.newInstance( FinalApplicationStatus.FAILED, "", ""); - Throwable cause = null; try { am1.unregisterAppAttempt(req, false); + Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); + } catch (ApplicationMasterNotRegisteredException e) { + Assert.assertNotNull(e); + Assert.assertNotNull(e.getMessage()); + Assert.assertTrue(e.getMessage().contains( + "Application Master is trying to unregister before registering for:" + )); } catch (Exception e) { - cause = e.getCause(); + Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); } - Assert.assertNotNull(cause); - Assert - .assertTrue(cause instanceof ApplicationMasterNotRegisteredException); - Assert.assertNotNull(cause.getMessage()); - Assert - .assertTrue(cause - .getMessage() - .contains( - "Application Master is trying to unregister before registering for:")); am1.registerAppAttempt(); am1.unregisterAppAttempt(req, false); + am1.waitForState(RMAppAttemptState.FINISHING); } finally { if (rm != null) { rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 2af74abfa7f..80be22ba194 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -668,6 +670,9 @@ public class TestWorkPreservingRMRestart { // create app and launch the AM RMApp app0 = rm1.submitApp(200); MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + // Issuing registerAppAttempt() before and after RM restart to confirm + // registerApplicationMaster() is idempotent. + am0.registerAppAttempt(); // start new RM rm2 = new MockRM(conf, memStore); @@ -676,6 +681,7 @@ public class TestWorkPreservingRMRestart { rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + // retry registerApplicationMaster() after RM restart. am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); @@ -897,4 +903,44 @@ public class TestWorkPreservingRMRestart { Thread.sleep(500); } } + + /** + * Testing to confirm that retried finishApplicationMaster() doesn't throw + * InvalidApplicationMasterRequest before and after RM restart. + */ + @Test (timeout = 20000) + public void testRetriedFinishApplicationMasterRequest() + throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + + am0.registerAppAttempt(); + + // Emulating following a scenario: + // RM1 saves the app in RMStateStore and then crashes, + // FinishApplicationMasterResponse#isRegistered still return false, + // so AM still retry the 2nd RM + MockRM.finishAMAndVerifyAppState(app0, rm1, nm1, am0); + + + // start new RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.unregisterAppAttempt(false); + } + }