YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager after the submitApplication call goes through. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1576160 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576162 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-11 00:52:33 +00:00
parent 0f7d99268c
commit cbe653f9a2
5 changed files with 214 additions and 40 deletions

View File

@ -266,6 +266,9 @@ Release 2.4.0 - UNRELEASED
after getting an application-ID but before submission and can still submit to after getting an application-ID but before submission and can still submit to
the newly active RM with no issues. (Xuan Gong via vinodkv) the newly active RM with no issues. (Xuan Gong via vinodkv)
YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager
after the submitApplication call goes through. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
/** /**
* <p>The protocol between clients and the <code>ResourceManager</code> * <p>The protocol between clients and the <code>ResourceManager</code>
@ -107,7 +110,16 @@ public interface ApplicationClientProtocol {
* {@link SubmitApplicationResponse} on accepting the submission and throws * {@link SubmitApplicationResponse} on accepting the submission and throws
* an exception if it rejects the submission. However, this call needs to be * an exception if it rejects the submission. However, this call needs to be
* followed by {@link #getApplicationReport(GetApplicationReportRequest)} * followed by {@link #getApplicationReport(GetApplicationReportRequest)}
* to make sure that the application gets properly submitted.</p> * to make sure that the application gets properly submitted - obtaining a
* {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee
* that RM 'remembers' this application beyond failover or restart. If RM
* failover or RM restart happens before ResourceManager saves the
* application's state successfully, the subsequent
* {@link #getApplicationReport(GetApplicationReportRequest)} will throw
* a {@link ApplicationNotFoundException}. The Clients need to re-submit
* the application with the same {@link ApplicationSubmissionContext} when
* it encounters the {@link ApplicationNotFoundException} on the
* {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
* *
* <p> In secure mode,the <code>ResourceManager</code> verifies access to * <p> In secure mode,the <code>ResourceManager</code> verifies access to
* queues etc. before accepting the application submission.</p> * queues etc. before accepting the application submission.</p>
@ -186,6 +198,7 @@ public interface ApplicationClientProtocol {
*/ */
@Public @Public
@Stable @Stable
@Idempotent
public GetApplicationReportResponse getApplicationReport( public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) GetApplicationReportRequest request)
throws YarnException, IOException; throws YarnException, IOException;

View File

@ -29,6 +29,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -45,7 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException; import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -84,16 +87,29 @@ public abstract class YarnClient extends AbstractService {
/** /**
* <p> * <p>
* Submit a new application to <code>YARN.</code> It is a blocking call, such * Submit a new application to <code>YARN.</code> It is a blocking call - it
* that it will not return {@link ApplicationId} until the submitted * will not return {@link ApplicationId} until the submitted application is
* application has been submitted and accepted by the ResourceManager. * submitted successfully and accepted by the ResourceManager.
* </p> * </p>
* *
* <p> * <p>
* Should provide an {@link ApplicationId} when submits a new application, * Users should provide an {@link ApplicationId} as part of the parameter
* otherwise, it will throw the {@link ApplicationIdNotProvidedException} * {@link ApplicationSubmissionContext} when submitting a new application,
* otherwise it will throw the {@link ApplicationIdNotProvidedException}.
* </p> * </p>
* *
* <p>This internally calls {@link ApplicationClientProtocol#submitApplication
* (SubmitApplicationRequest)}, and after that, it internally invokes
* {@link ApplicationClientProtocol#getApplicationReport
* (GetApplicationReportRequest)} and waits till it can make sure that the
* application gets properly submitted. If RM fails over or RM restart
* happens before ResourceManager saves the application's state,
* {@link ApplicationClientProtocol
* #getApplicationReport(GetApplicationReportRequest)} will throw
* the {@link ApplicationNotFoundException}. This API automatically resubmits
* the application with the same {@link ApplicationSubmissionContext} when it
* catches the {@link ApplicationNotFoundException}</p>
*
* @param appContext * @param appContext
* {@link ApplicationSubmissionContext} containing all the details * {@link ApplicationSubmissionContext} containing all the details
* needed to submit a new application * needed to submit a new application
@ -102,8 +118,9 @@ public abstract class YarnClient extends AbstractService {
* @throws IOException * @throws IOException
* @see #createApplication() * @see #createApplication()
*/ */
public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext) public abstract ApplicationId submitApplication(
throws YarnException, IOException; ApplicationSubmissionContext appContext) throws YarnException,
IOException;
/** /**
* <p> * <p>

View File

@ -187,35 +187,43 @@ public class YarnClientImpl extends YarnClient {
int pollCount = 0; int pollCount = 0;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
//TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
while (true) { while (true) {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try { try {
Thread.sleep(submitPollIntervalMillis); YarnApplicationState state =
} catch (InterruptedException ie) { getApplicationReport(applicationId).getYarnApplicationState();
LOG.error("Interrupted while waiting for application " + applicationId if (!state.equals(YarnApplicationState.NEW) &&
+ " to be successfully submitted."); !state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application "
+ applicationId
+ " to be successfully submitted.");
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
rmClient.submitApplication(request);
} }
} }

View File

@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.junit.Test; import org.junit.Test;
@ -80,10 +82,141 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{
count++; count++;
} }
// Verify submittion is successful // Verify submittion is successful
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId()) YarnApplicationState state =
.getYarnApplicationState() == YarnApplicationState.NEW); rm.getApplicationReport(app.getApplicationId())
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId()) .getYarnApplicationState();
.getYarnApplicationState() == YarnApplicationState.NEW_SAVING); Assert.assertTrue(state == YarnApplicationState.ACCEPTED
|| state == YarnApplicationState.SUBMITTED);
Assert.assertEquals(expectedAppId, app.getApplicationId()); Assert.assertEquals(expectedAppId, app.getApplicationId());
} }
// There are two scenarios when RM failover happens
// after SubmitApplication Call:
// 1) RMStateStore already saved the ApplicationState when failover happens
// 2) RMStateStore did not save the ApplicationState when failover happens
@Test
public void
testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState()
throws Exception {
// Test scenario 1 when RM failover happens
// after SubmitApplication Call:
// RMStateStore already saved the ApplicationState when failover happens
startRMs();
// Submit Application
// After submission, the applicationState will be saved in RMStateStore.
RMApp app0 = rm1.submitApp(200);
// Do the failover
explicitFailover();
// Since the applicationState has already been saved in RMStateStore
// before failover happens, the current active rm can load the previous
// applicationState.
ApplicationReport appReport =
rm2.getApplicationReport(app0.getApplicationId());
// verify previous submission is successful.
Assert.assertTrue(appReport.getYarnApplicationState()
== YarnApplicationState.ACCEPTED ||
appReport.getYarnApplicationState()
== YarnApplicationState.SUBMITTED);
}
@Test
public void
testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState()
throws Exception {
// Test scenario 2 when RM failover happens
// after SubmitApplication Call:
// RMStateStore did not save the ApplicationState when failover happens.
// Using customized RMAppManager.
startRMsWithCustomizedRMAppManager();
// Submit Application
// After submission, the applicationState will
// not be saved in RMStateStore
RMApp app0 =
rm1.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false);
// Do the failover
explicitFailover();
// Since the applicationState is not saved in RMStateStore
// when failover happens. The current active RM can not load
// previous applicationState.
// Expect ApplicationNotFoundException by calling getApplicationReport().
try {
rm2.getApplicationReport(app0.getApplicationId());
Assert.fail("Should get ApplicationNotFoundException here");
} catch (ApplicationNotFoundException ex) {
// expected ApplicationNotFoundException
}
// Submit the application with previous ApplicationId to current active RM
// This will mimic the similar behavior of YarnClient which will re-submit
// Application with previous applicationId
// when catches the ApplicationNotFoundException
RMApp app1 =
rm2.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false, true, app0.getApplicationId());
verifySubmitApp(rm2, app1, app0.getApplicationId());
}
/**
* Test multiple calls of getApplicationReport, to make sure
* it is idempotent
*/
@Test
public void testGetApplicationReportIdempotent() throws Exception{
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// Submit Application
// After submission, the applicationState will be saved in RMStateStore.
RMApp app = rm1.submitApp(200);
ApplicationReport appReport1 =
rm1.getApplicationReport(app.getApplicationId());
Assert.assertTrue(appReport1.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED ||
appReport1.getYarnApplicationState() ==
YarnApplicationState.SUBMITTED);
// call getApplicationReport again
ApplicationReport appReport2 =
rm1.getApplicationReport(app.getApplicationId());
Assert.assertEquals(appReport1.getApplicationId(),
appReport2.getApplicationId());
Assert.assertEquals(appReport1.getYarnApplicationState(),
appReport2.getYarnApplicationState());
// Do the failover
explicitFailover();
// call getApplicationReport
ApplicationReport appReport3 =
rm2.getApplicationReport(app.getApplicationId());
Assert.assertEquals(appReport1.getApplicationId(),
appReport3.getApplicationId());
Assert.assertEquals(appReport1.getYarnApplicationState(),
appReport3.getYarnApplicationState());
// call getApplicationReport again
ApplicationReport appReport4 =
rm2.getApplicationReport(app.getApplicationId());
Assert.assertEquals(appReport3.getApplicationId(),
appReport4.getApplicationId());
Assert.assertEquals(appReport3.getYarnApplicationState(),
appReport4.getYarnApplicationState());
}
} }