From cbe653f9a2358724dc7b1319ff97693bfb965944 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 11 Mar 2014 00:52:33 +0000 Subject: [PATCH] YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager after the submitApplication call goes through. Contributed by Xuan Gong. svn merge --ignore-ancestry -c 1576160 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576162 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/ApplicationClientProtocol.java | 15 +- .../hadoop/yarn/client/api/YarnClient.java | 33 +++- .../yarn/client/api/impl/YarnClientImpl.java | 62 ++++---- .../TestSubmitApplicationWithRMHA.java | 141 +++++++++++++++++- 5 files changed, 214 insertions(+), 40 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 38c4f86cb2f..bf58ab9a5ff 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -266,6 +266,9 @@ Release 2.4.0 - UNRELEASED after getting an application-ID but before submission and can still submit to the newly active RM with no issues. (Xuan Gong via vinodkv) + YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager + after the submitApplication call goes through. (Xuan Gong via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 0abaafb0966..c475dc73e51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; /** *

The protocol between clients and the ResourceManager @@ -107,7 +110,16 @@ public interface ApplicationClientProtocol { * {@link SubmitApplicationResponse} on accepting the submission and throws * an exception if it rejects the submission. However, this call needs to be * followed by {@link #getApplicationReport(GetApplicationReportRequest)} - * to make sure that the application gets properly submitted.

+ * to make sure that the application gets properly submitted - obtaining a + * {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee + * that RM 'remembers' this application beyond failover or restart. If RM + * failover or RM restart happens before ResourceManager saves the + * application's state successfully, the subsequent + * {@link #getApplicationReport(GetApplicationReportRequest)} will throw + * a {@link ApplicationNotFoundException}. The Clients need to re-submit + * the application with the same {@link ApplicationSubmissionContext} when + * it encounters the {@link ApplicationNotFoundException} on the + * {@link #getApplicationReport(GetApplicationReportRequest)} call.

* *

In secure mode,the ResourceManager verifies access to * queues etc. before accepting the application submission.

@@ -186,6 +198,7 @@ public interface ApplicationClientProtocol { */ @Public @Stable + @Idempotent public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 01491304336..9e27de58290 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -29,6 +29,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Text; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -45,7 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException; -import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -84,16 +87,29 @@ public abstract class YarnClient extends AbstractService { /** *

- * Submit a new application to YARN. It is a blocking call, such - * that it will not return {@link ApplicationId} until the submitted - * application has been submitted and accepted by the ResourceManager. + * Submit a new application to YARN. It is a blocking call - it + * will not return {@link ApplicationId} until the submitted application is + * submitted successfully and accepted by the ResourceManager. *

* *

- * Should provide an {@link ApplicationId} when submits a new application, - * otherwise, it will throw the {@link ApplicationIdNotProvidedException} + * Users should provide an {@link ApplicationId} as part of the parameter + * {@link ApplicationSubmissionContext} when submitting a new application, + * otherwise it will throw the {@link ApplicationIdNotProvidedException}. *

* + *

This internally calls {@link ApplicationClientProtocol#submitApplication + * (SubmitApplicationRequest)}, and after that, it internally invokes + * {@link ApplicationClientProtocol#getApplicationReport + * (GetApplicationReportRequest)} and waits till it can make sure that the + * application gets properly submitted. If RM fails over or RM restart + * happens before ResourceManager saves the application's state, + * {@link ApplicationClientProtocol + * #getApplicationReport(GetApplicationReportRequest)} will throw + * the {@link ApplicationNotFoundException}. This API automatically resubmits + * the application with the same {@link ApplicationSubmissionContext} when it + * catches the {@link ApplicationNotFoundException}

+ * * @param appContext * {@link ApplicationSubmissionContext} containing all the details * needed to submit a new application @@ -102,8 +118,9 @@ public abstract class YarnClient extends AbstractService { * @throws IOException * @see #createApplication() */ - public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext) - throws YarnException, IOException; + public abstract ApplicationId submitApplication( + ApplicationSubmissionContext appContext) throws YarnException, + IOException; /** *

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index cb88fde4eb8..f7f955ef068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -187,35 +187,43 @@ public class YarnClientImpl extends YarnClient { int pollCount = 0; long startTime = System.currentTimeMillis(); - //TODO: YARN-1764:Handle RM fail overs after the submitApplication call. while (true) { - YarnApplicationState state = - getApplicationReport(applicationId).getYarnApplicationState(); - if (!state.equals(YarnApplicationState.NEW) && - !state.equals(YarnApplicationState.NEW_SAVING)) { - LOG.info("Submitted application " + applicationId); - break; - } - - long elapsedMillis = System.currentTimeMillis() - startTime; - if (enforceAsyncAPITimeout() && - elapsedMillis >= asyncApiPollTimeoutMillis) { - throw new YarnException("Timed out while waiting for application " + - applicationId + " to be submitted successfully"); - } - - // Notify the client through the log every 10 poll, in case the client - // is blocked here too long. - if (++pollCount % 10 == 0) { - LOG.info("Application submission is not finished, " + - "submitted application " + applicationId + - " is still in " + state); - } try { - Thread.sleep(submitPollIntervalMillis); - } catch (InterruptedException ie) { - LOG.error("Interrupted while waiting for application " + applicationId - + " to be successfully submitted."); + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + LOG.info("Submitted application " + applicationId); + break; + } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (enforceAsyncAPITimeout() && + elapsedMillis >= asyncApiPollTimeoutMillis) { + throw new YarnException("Timed out while waiting for application " + + applicationId + " to be submitted successfully"); + } + + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(submitPollIntervalMillis); + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for application " + + applicationId + + " to be successfully submitted."); + } + } catch (ApplicationNotFoundException ex) { + // FailOver or RM restart happens before RMStateStore saves + // ApplicationState + LOG.info("Re-submit application " + applicationId + "with the " + + "same ApplicationSubmissionContext"); + rmClient.submitApplication(request); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java index d02d43b4083..0c7da89f281 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSubmitApplicationWithRMHA.java @@ -24,8 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.junit.Test; @@ -80,10 +82,141 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{ count++; } // Verify submittion is successful - Assert.assertFalse(rm.getApplicationReport(app.getApplicationId()) - .getYarnApplicationState() == YarnApplicationState.NEW); - Assert.assertFalse(rm.getApplicationReport(app.getApplicationId()) - .getYarnApplicationState() == YarnApplicationState.NEW_SAVING); + YarnApplicationState state = + rm.getApplicationReport(app.getApplicationId()) + .getYarnApplicationState(); + Assert.assertTrue(state == YarnApplicationState.ACCEPTED + || state == YarnApplicationState.SUBMITTED); Assert.assertEquals(expectedAppId, app.getApplicationId()); } + + // There are two scenarios when RM failover happens + // after SubmitApplication Call: + // 1) RMStateStore already saved the ApplicationState when failover happens + // 2) RMStateStore did not save the ApplicationState when failover happens + + @Test + public void + testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState() + throws Exception { + // Test scenario 1 when RM failover happens + // after SubmitApplication Call: + // RMStateStore already saved the ApplicationState when failover happens + startRMs(); + + // Submit Application + // After submission, the applicationState will be saved in RMStateStore. + RMApp app0 = rm1.submitApp(200); + + // Do the failover + explicitFailover(); + + // Since the applicationState has already been saved in RMStateStore + // before failover happens, the current active rm can load the previous + // applicationState. + ApplicationReport appReport = + rm2.getApplicationReport(app0.getApplicationId()); + + // verify previous submission is successful. + Assert.assertTrue(appReport.getYarnApplicationState() + == YarnApplicationState.ACCEPTED || + appReport.getYarnApplicationState() + == YarnApplicationState.SUBMITTED); + } + + @Test + public void + testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState() + throws Exception { + // Test scenario 2 when RM failover happens + // after SubmitApplication Call: + // RMStateStore did not save the ApplicationState when failover happens. + // Using customized RMAppManager. + startRMsWithCustomizedRMAppManager(); + + // Submit Application + // After submission, the applicationState will + // not be saved in RMStateStore + RMApp app0 = + rm1.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false); + + // Do the failover + explicitFailover(); + + // Since the applicationState is not saved in RMStateStore + // when failover happens. The current active RM can not load + // previous applicationState. + // Expect ApplicationNotFoundException by calling getApplicationReport(). + try { + rm2.getApplicationReport(app0.getApplicationId()); + Assert.fail("Should get ApplicationNotFoundException here"); + } catch (ApplicationNotFoundException ex) { + // expected ApplicationNotFoundException + } + + // Submit the application with previous ApplicationId to current active RM + // This will mimic the similar behavior of YarnClient which will re-submit + // Application with previous applicationId + // when catches the ApplicationNotFoundException + RMApp app1 = + rm2.submitApp(200, "", UserGroupInformation + .getCurrentUser().getShortUserName(), null, false, null, + configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, + false, false, true, app0.getApplicationId()); + + verifySubmitApp(rm2, app1, app0.getApplicationId()); + } + + /** + * Test multiple calls of getApplicationReport, to make sure + * it is idempotent + */ + @Test + public void testGetApplicationReportIdempotent() throws Exception{ + // start two RMs, and transit rm1 to active, rm2 to standby + startRMs(); + + // Submit Application + // After submission, the applicationState will be saved in RMStateStore. + RMApp app = rm1.submitApp(200); + + ApplicationReport appReport1 = + rm1.getApplicationReport(app.getApplicationId()); + Assert.assertTrue(appReport1.getYarnApplicationState() == + YarnApplicationState.ACCEPTED || + appReport1.getYarnApplicationState() == + YarnApplicationState.SUBMITTED); + + // call getApplicationReport again + ApplicationReport appReport2 = + rm1.getApplicationReport(app.getApplicationId()); + Assert.assertEquals(appReport1.getApplicationId(), + appReport2.getApplicationId()); + Assert.assertEquals(appReport1.getYarnApplicationState(), + appReport2.getYarnApplicationState()); + + // Do the failover + explicitFailover(); + + // call getApplicationReport + ApplicationReport appReport3 = + rm2.getApplicationReport(app.getApplicationId()); + Assert.assertEquals(appReport1.getApplicationId(), + appReport3.getApplicationId()); + Assert.assertEquals(appReport1.getYarnApplicationState(), + appReport3.getYarnApplicationState()); + + // call getApplicationReport again + ApplicationReport appReport4 = + rm2.getApplicationReport(app.getApplicationId()); + Assert.assertEquals(appReport3.getApplicationId(), + appReport4.getApplicationId()); + Assert.assertEquals(appReport3.getYarnApplicationState(), + appReport4.getYarnApplicationState()); + } }