YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager after the submitApplication call goes through. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576160 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e57b1fe86
commit
0edda25373
|
@ -281,6 +281,9 @@ Release 2.4.0 - UNRELEASED
|
|||
after getting an application-ID but before submission and can still submit to
|
||||
the newly active RM with no issues. (Xuan Gong via vinodkv)
|
||||
|
||||
YARN-1764. Modified YarnClient to correctly handle failover of ResourceManager
|
||||
after the submitApplication call goes through. (Xuan Gong via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.io.retry.Idempotent;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
|
@ -58,7 +59,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
|
||||
/**
|
||||
* <p>The protocol between clients and the <code>ResourceManager</code>
|
||||
|
@ -107,7 +110,16 @@ public interface ApplicationClientProtocol {
|
|||
* {@link SubmitApplicationResponse} on accepting the submission and throws
|
||||
* an exception if it rejects the submission. However, this call needs to be
|
||||
* followed by {@link #getApplicationReport(GetApplicationReportRequest)}
|
||||
* to make sure that the application gets properly submitted.</p>
|
||||
* to make sure that the application gets properly submitted - obtaining a
|
||||
* {@link SubmitApplicationResponse} from ResourceManager doesn't guarantee
|
||||
* that RM 'remembers' this application beyond failover or restart. If RM
|
||||
* failover or RM restart happens before ResourceManager saves the
|
||||
* application's state successfully, the subsequent
|
||||
* {@link #getApplicationReport(GetApplicationReportRequest)} will throw
|
||||
* a {@link ApplicationNotFoundException}. The Clients need to re-submit
|
||||
* the application with the same {@link ApplicationSubmissionContext} when
|
||||
* it encounters the {@link ApplicationNotFoundException} on the
|
||||
* {@link #getApplicationReport(GetApplicationReportRequest)} call.</p>
|
||||
*
|
||||
* <p> In secure mode,the <code>ResourceManager</code> verifies access to
|
||||
* queues etc. before accepting the application submission.</p>
|
||||
|
@ -186,6 +198,7 @@ public interface ApplicationClientProtocol {
|
|||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@Idempotent
|
||||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -45,7 +48,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
|
||||
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
|
||||
|
@ -84,16 +87,29 @@ public abstract class YarnClient extends AbstractService {
|
|||
|
||||
/**
|
||||
* <p>
|
||||
* Submit a new application to <code>YARN.</code> It is a blocking call, such
|
||||
* that it will not return {@link ApplicationId} until the submitted
|
||||
* application has been submitted and accepted by the ResourceManager.
|
||||
* Submit a new application to <code>YARN.</code> It is a blocking call - it
|
||||
* will not return {@link ApplicationId} until the submitted application is
|
||||
* submitted successfully and accepted by the ResourceManager.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Should provide an {@link ApplicationId} when submits a new application,
|
||||
* otherwise, it will throw the {@link ApplicationIdNotProvidedException}
|
||||
* Users should provide an {@link ApplicationId} as part of the parameter
|
||||
* {@link ApplicationSubmissionContext} when submitting a new application,
|
||||
* otherwise it will throw the {@link ApplicationIdNotProvidedException}.
|
||||
* </p>
|
||||
*
|
||||
* <p>This internally calls {@link ApplicationClientProtocol#submitApplication
|
||||
* (SubmitApplicationRequest)}, and after that, it internally invokes
|
||||
* {@link ApplicationClientProtocol#getApplicationReport
|
||||
* (GetApplicationReportRequest)} and waits till it can make sure that the
|
||||
* application gets properly submitted. If RM fails over or RM restart
|
||||
* happens before ResourceManager saves the application's state,
|
||||
* {@link ApplicationClientProtocol
|
||||
* #getApplicationReport(GetApplicationReportRequest)} will throw
|
||||
* the {@link ApplicationNotFoundException}. This API automatically resubmits
|
||||
* the application with the same {@link ApplicationSubmissionContext} when it
|
||||
* catches the {@link ApplicationNotFoundException}</p>
|
||||
*
|
||||
* @param appContext
|
||||
* {@link ApplicationSubmissionContext} containing all the details
|
||||
* needed to submit a new application
|
||||
|
@ -102,8 +118,9 @@ public abstract class YarnClient extends AbstractService {
|
|||
* @throws IOException
|
||||
* @see #createApplication()
|
||||
*/
|
||||
public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext)
|
||||
throws YarnException, IOException;
|
||||
public abstract ApplicationId submitApplication(
|
||||
ApplicationSubmissionContext appContext) throws YarnException,
|
||||
IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
|
|
@ -187,35 +187,43 @@ public class YarnClientImpl extends YarnClient {
|
|||
int pollCount = 0;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
//TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
|
||||
while (true) {
|
||||
YarnApplicationState state =
|
||||
getApplicationReport(applicationId).getYarnApplicationState();
|
||||
if (!state.equals(YarnApplicationState.NEW) &&
|
||||
!state.equals(YarnApplicationState.NEW_SAVING)) {
|
||||
LOG.info("Submitted application " + applicationId);
|
||||
break;
|
||||
}
|
||||
|
||||
long elapsedMillis = System.currentTimeMillis() - startTime;
|
||||
if (enforceAsyncAPITimeout() &&
|
||||
elapsedMillis >= asyncApiPollTimeoutMillis) {
|
||||
throw new YarnException("Timed out while waiting for application " +
|
||||
applicationId + " to be submitted successfully");
|
||||
}
|
||||
|
||||
// Notify the client through the log every 10 poll, in case the client
|
||||
// is blocked here too long.
|
||||
if (++pollCount % 10 == 0) {
|
||||
LOG.info("Application submission is not finished, " +
|
||||
"submitted application " + applicationId +
|
||||
" is still in " + state);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(submitPollIntervalMillis);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while waiting for application " + applicationId
|
||||
+ " to be successfully submitted.");
|
||||
YarnApplicationState state =
|
||||
getApplicationReport(applicationId).getYarnApplicationState();
|
||||
if (!state.equals(YarnApplicationState.NEW) &&
|
||||
!state.equals(YarnApplicationState.NEW_SAVING)) {
|
||||
LOG.info("Submitted application " + applicationId);
|
||||
break;
|
||||
}
|
||||
|
||||
long elapsedMillis = System.currentTimeMillis() - startTime;
|
||||
if (enforceAsyncAPITimeout() &&
|
||||
elapsedMillis >= asyncApiPollTimeoutMillis) {
|
||||
throw new YarnException("Timed out while waiting for application " +
|
||||
applicationId + " to be submitted successfully");
|
||||
}
|
||||
|
||||
// Notify the client through the log every 10 poll, in case the client
|
||||
// is blocked here too long.
|
||||
if (++pollCount % 10 == 0) {
|
||||
LOG.info("Application submission is not finished, " +
|
||||
"submitted application " + applicationId +
|
||||
" is still in " + state);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(submitPollIntervalMillis);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.error("Interrupted while waiting for application "
|
||||
+ applicationId
|
||||
+ " to be successfully submitted.");
|
||||
}
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
// FailOver or RM restart happens before RMStateStore saves
|
||||
// ApplicationState
|
||||
LOG.info("Re-submit application " + applicationId + "with the " +
|
||||
"same ApplicationSubmissionContext");
|
||||
rmClient.submitApplication(request);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -80,10 +82,141 @@ public class TestSubmitApplicationWithRMHA extends RMHATestBase{
|
|||
count++;
|
||||
}
|
||||
// Verify submittion is successful
|
||||
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
|
||||
.getYarnApplicationState() == YarnApplicationState.NEW);
|
||||
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
|
||||
.getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
|
||||
YarnApplicationState state =
|
||||
rm.getApplicationReport(app.getApplicationId())
|
||||
.getYarnApplicationState();
|
||||
Assert.assertTrue(state == YarnApplicationState.ACCEPTED
|
||||
|| state == YarnApplicationState.SUBMITTED);
|
||||
Assert.assertEquals(expectedAppId, app.getApplicationId());
|
||||
}
|
||||
|
||||
// There are two scenarios when RM failover happens
|
||||
// after SubmitApplication Call:
|
||||
// 1) RMStateStore already saved the ApplicationState when failover happens
|
||||
// 2) RMStateStore did not save the ApplicationState when failover happens
|
||||
|
||||
@Test
|
||||
public void
|
||||
testHandleRMHAafterSubmitApplicationCallWithSavedApplicationState()
|
||||
throws Exception {
|
||||
// Test scenario 1 when RM failover happens
|
||||
// after SubmitApplication Call:
|
||||
// RMStateStore already saved the ApplicationState when failover happens
|
||||
startRMs();
|
||||
|
||||
// Submit Application
|
||||
// After submission, the applicationState will be saved in RMStateStore.
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
|
||||
// Do the failover
|
||||
explicitFailover();
|
||||
|
||||
// Since the applicationState has already been saved in RMStateStore
|
||||
// before failover happens, the current active rm can load the previous
|
||||
// applicationState.
|
||||
ApplicationReport appReport =
|
||||
rm2.getApplicationReport(app0.getApplicationId());
|
||||
|
||||
// verify previous submission is successful.
|
||||
Assert.assertTrue(appReport.getYarnApplicationState()
|
||||
== YarnApplicationState.ACCEPTED ||
|
||||
appReport.getYarnApplicationState()
|
||||
== YarnApplicationState.SUBMITTED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void
|
||||
testHandleRMHAafterSubmitApplicationCallWithoutSavedApplicationState()
|
||||
throws Exception {
|
||||
// Test scenario 2 when RM failover happens
|
||||
// after SubmitApplication Call:
|
||||
// RMStateStore did not save the ApplicationState when failover happens.
|
||||
// Using customized RMAppManager.
|
||||
startRMsWithCustomizedRMAppManager();
|
||||
|
||||
// Submit Application
|
||||
// After submission, the applicationState will
|
||||
// not be saved in RMStateStore
|
||||
RMApp app0 =
|
||||
rm1.submitApp(200, "", UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), null, false, null,
|
||||
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||
false, false);
|
||||
|
||||
// Do the failover
|
||||
explicitFailover();
|
||||
|
||||
// Since the applicationState is not saved in RMStateStore
|
||||
// when failover happens. The current active RM can not load
|
||||
// previous applicationState.
|
||||
// Expect ApplicationNotFoundException by calling getApplicationReport().
|
||||
try {
|
||||
rm2.getApplicationReport(app0.getApplicationId());
|
||||
Assert.fail("Should get ApplicationNotFoundException here");
|
||||
} catch (ApplicationNotFoundException ex) {
|
||||
// expected ApplicationNotFoundException
|
||||
}
|
||||
|
||||
// Submit the application with previous ApplicationId to current active RM
|
||||
// This will mimic the similar behavior of YarnClient which will re-submit
|
||||
// Application with previous applicationId
|
||||
// when catches the ApplicationNotFoundException
|
||||
RMApp app1 =
|
||||
rm2.submitApp(200, "", UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), null, false, null,
|
||||
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||
false, false, true, app0.getApplicationId());
|
||||
|
||||
verifySubmitApp(rm2, app1, app0.getApplicationId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test multiple calls of getApplicationReport, to make sure
|
||||
* it is idempotent
|
||||
*/
|
||||
@Test
|
||||
public void testGetApplicationReportIdempotent() throws Exception{
|
||||
// start two RMs, and transit rm1 to active, rm2 to standby
|
||||
startRMs();
|
||||
|
||||
// Submit Application
|
||||
// After submission, the applicationState will be saved in RMStateStore.
|
||||
RMApp app = rm1.submitApp(200);
|
||||
|
||||
ApplicationReport appReport1 =
|
||||
rm1.getApplicationReport(app.getApplicationId());
|
||||
Assert.assertTrue(appReport1.getYarnApplicationState() ==
|
||||
YarnApplicationState.ACCEPTED ||
|
||||
appReport1.getYarnApplicationState() ==
|
||||
YarnApplicationState.SUBMITTED);
|
||||
|
||||
// call getApplicationReport again
|
||||
ApplicationReport appReport2 =
|
||||
rm1.getApplicationReport(app.getApplicationId());
|
||||
Assert.assertEquals(appReport1.getApplicationId(),
|
||||
appReport2.getApplicationId());
|
||||
Assert.assertEquals(appReport1.getYarnApplicationState(),
|
||||
appReport2.getYarnApplicationState());
|
||||
|
||||
// Do the failover
|
||||
explicitFailover();
|
||||
|
||||
// call getApplicationReport
|
||||
ApplicationReport appReport3 =
|
||||
rm2.getApplicationReport(app.getApplicationId());
|
||||
Assert.assertEquals(appReport1.getApplicationId(),
|
||||
appReport3.getApplicationId());
|
||||
Assert.assertEquals(appReport1.getYarnApplicationState(),
|
||||
appReport3.getYarnApplicationState());
|
||||
|
||||
// call getApplicationReport again
|
||||
ApplicationReport appReport4 =
|
||||
rm2.getApplicationReport(app.getApplicationId());
|
||||
Assert.assertEquals(appReport3.getApplicationId(),
|
||||
appReport4.getApplicationId());
|
||||
Assert.assertEquals(appReport3.getYarnApplicationState(),
|
||||
appReport4.getYarnApplicationState());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue