YARN-2416. InvalidStateTransitonException in ResourceManager if AMLauncher does not receive response for startContainers() call in time. Contributed by Jonathan Eagles

(cherry picked from commit 3efcd51c3b3eb667d83e08b500bb7a7ea559fabe)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
This commit is contained in:
Jason Lowe 2017-08-22 12:56:09 -05:00
parent 3f735ad640
commit 0b64773678
2 changed files with 41 additions and 16 deletions

View File

@ -180,7 +180,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new ExpiredTransition();
private static final AttemptFailedTransition FAILED_TRANSITION =
new AttemptFailedTransition();
private static final AMRegisteredTransition REGISTERED_TRANSITION =
new AMRegisteredTransition();
private static final AMLaunchedTransition LAUNCHED_TRANSITION =
new AMLaunchedTransition();
private RMAppAttemptEvent eventCausingFinalSaving;
private RMAppAttemptState targetedFinalState;
private RMAppAttemptState recoveredFinalState;
@ -310,7 +313,7 @@ RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
RMAppAttemptEventType.LAUNCHED, LAUNCHED_TRANSITION)
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.LAUNCH_FAILED,
new FinalSavingTransition(new LaunchFailedTransition(),
@ -324,6 +327,8 @@ RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
@ -331,7 +336,7 @@ RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
.addTransition(RMAppAttemptState.LAUNCHED,
EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
@ -353,6 +358,8 @@ RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
RMAppAttemptState.FAILED))
// Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.LAUNCHED)
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
@ -417,6 +424,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL,
@ -434,6 +442,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
new FinalTransition(RMAppAttemptState.FINISHED))
.addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
@ -447,6 +456,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptState.FINISHED,
RMAppAttemptState.FINISHED,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
@ -1226,7 +1236,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
* 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
* heart beat back).
*/
(new AMLaunchedTransition()).transition(appAttempt, event);
LAUNCHED_TRANSITION.transition(appAttempt, event);
return RMAppAttemptState.LAUNCHED;
}
}
@ -1459,7 +1469,8 @@ private static class AMLaunchedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
if (event.getType() == RMAppAttemptEventType.LAUNCHED) {
if (event.getType() == RMAppAttemptEventType.LAUNCHED
|| event.getType() == RMAppAttemptEventType.REGISTERED) {
appAttempt.launchAMEndTime = System.currentTimeMillis();
long delay = appAttempt.launchAMEndTime -
appAttempt.launchAMStartTime;
@ -1586,6 +1597,10 @@ private static final class AMRegisteredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
if (!RMAppAttemptState.LAUNCHED.equals(appAttempt.getState())) {
// registered received before launch
LAUNCHED_TRANSITION.transition(appAttempt, event);
}
long delay = System.currentTimeMillis() - appAttempt.launchAMEndTime;
ClusterMetrics.getMetrics().addAMRegisterDelay(delay);
RMAppAttemptRegistrationEvent registrationEvent

View File

@ -525,12 +525,9 @@ private void testAppAttemptFailedState(Container container,
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
/**
* {@link RMAppAttemptState#LAUNCH}
*/
private void testAppAttemptLaunchedState(Container container) {
assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState());
private void testAppAttemptLaunchedState(Container container,
RMAppAttemptState state) {
assertEquals(state, applicationAttempt.getAppAttemptState());
assertEquals(container, applicationAttempt.getMasterContainer());
if (UserGroupInformation.isSecurityEnabled()) {
// ClientTokenMasterKey has been registered in SecretManager, it's able to
@ -685,13 +682,18 @@ private Container allocateApplicationAttempt() {
}
private void launchApplicationAttempt(Container container) {
launchApplicationAttempt(container, RMAppAttemptState.LAUNCHED);
}
private void launchApplicationAttempt(Container container,
RMAppAttemptState state) {
applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
testAppAttemptLaunchedState(container);
testAppAttemptLaunchedState(container, state);
}
private void runApplicationAttempt(Container container,
String host,
int rpcPort,
@ -722,7 +724,7 @@ private void testUnmanagedAMSuccess(String url) {
when(submissionContext.getUnmanagedAM()).thenReturn(true);
// submit AM and check it goes to LAUNCHED state
scheduleApplicationAttempt();
testAppAttemptLaunchedState(null);
testAppAttemptLaunchedState(null, RMAppAttemptState.LAUNCHED);
verify(amLivelinessMonitor, times(1)).register(
applicationAttempt.getAppAttemptId());
@ -929,7 +931,15 @@ public void testAllocatedToFailed() {
applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(amContainer, diagnostics);
}
@Test(timeout = 10000)
public void testAllocatedToRunning() {
Container amContainer = allocateApplicationAttempt();
// Register attempt event arrives before launched attempt event
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
launchApplicationAttempt(amContainer, RMAppAttemptState.RUNNING);
}
@Test(timeout = 10000)
public void testCreateAppAttemptReport() {
RMAppAttemptState[] attemptStates = RMAppAttemptState.values();