YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user kill request races with ApplicationMaster finish. Contributed by Jian He.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-11-13 08:12:41 -08:00
parent 33ea5ae92b
commit 3651fe1b08
5 changed files with 136 additions and 9 deletions

View File

@ -87,6 +87,9 @@ Release 2.7.0 - UNRELEASED
YARN-2603. ApplicationConstants missing HADOOP_MAPRED_HOME (Ray Chiang via YARN-2603. ApplicationConstants missing HADOOP_MAPRED_HOME (Ray Chiang via
aw) aw)
YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user
kill request races with ApplicationMaster finish. (Jian He via vinodkv)
Release 2.6.0 - 2014-11-15 Release 2.6.0 - 2014-11-15
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -348,6 +348,7 @@ public class ApplicationMasterService extends AbstractService implements
// ApplicationDoesNotExistInCacheException before and after // ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart. // RM work-preserving restart.
if (rmApp.isAppFinalStateStored()) { if (rmApp.isAppFinalStateStored()) {
LOG.info(rmApp.getApplicationId() + " unregistered successfully. ");
return FinishApplicationMasterResponse.newInstance(true); return FinishApplicationMasterResponse.newInstance(true);
} }

View File

@ -293,13 +293,23 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition( new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED)) new AppKilledTransition(), RMAppState.KILLED))
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
new AttemptUnregisteredTransition(),
RMAppState.FINISHING, RMAppState.FINISHED))
.addTransition(RMAppState.KILLING, RMAppState.FINISHED,
// UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.KILLING,
EnumSet.of(RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.KILLING))
.addTransition(RMAppState.KILLING, RMAppState.KILLING, .addTransition(RMAppState.KILLING, RMAppState.KILLING,
EnumSet.of( EnumSet.of(
RMAppEventType.NODE_UPDATE, RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_REGISTERED, RMAppEventType.ATTEMPT_REGISTERED,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.APP_UPDATE_SAVED, RMAppEventType.APP_UPDATE_SAVED,
RMAppEventType.KILL, RMAppEventType.MOVE)) RMAppEventType.KILL, RMAppEventType.MOVE))
@ -1199,6 +1209,14 @@ public class RMAppImpl implements RMApp, Recoverable {
+ app.maxAppAttempts); + app.maxAppAttempts);
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& numberOfFailure < app.maxAppAttempts) { && numberOfFailure < app.maxAppAttempts) {
if (initialState.equals(RMAppState.KILLING)) {
// If this is not last attempt, app should be killed instead of
// launching a new attempt
app.rememberTargetTransitionsAndStoreState(event,
new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
return RMAppState.FINAL_SAVING;
}
boolean transferStateFromPreviousAttempt; boolean transferStateFromPreviousAttempt;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt = transferStateFromPreviousAttempt =

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import org.junit.Before; import org.junit.Before;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import java.util.ArrayList; import java.util.ArrayList;
@ -37,16 +38,19 @@ import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -57,6 +61,8 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -73,6 +79,8 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class TestRM extends ParameterizedSchedulerTestBase { public class TestRM extends ParameterizedSchedulerTestBase {
@ -638,4 +646,107 @@ public class TestRM extends ParameterizedSchedulerTestBase {
Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted()); Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
} }
// Test Kill an app while the app is finishing in the meanwhile.
@Test (timeout = 30000)
public void testKillFinishingApp() throws Exception{
// this dispatcher ignores RMAppAttemptEventType.KILL event
final Dispatcher dispatcher = new AsyncDispatcher() {
@Override
public EventHandler getEventHandler() {
class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
@Override
public boolean matches(Object argument) {
if (argument instanceof RMAppAttemptEvent) {
if (((RMAppAttemptEvent) argument).getType().equals(
RMAppAttemptEventType.KILL)) {
return true;
}
}
return false;
}
}
EventHandler handler = spy(super.getEventHandler());
doNothing().when(handler).handle(argThat(new EventArgMatcher()));
return handler;
}
};
MockRM rm1 = new MockRM(conf){
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
rm1.killApp(app1.getApplicationId());
FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", "");
am1.unregisterAppAttempt(req,true);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
rm1.waitForState(app1.getApplicationId(), RMAppState.FINISHED);
}
// Test Kill an app while the app is failing
@Test (timeout = 30000)
public void testKillFailingApp() throws Exception{
// this dispatcher ignores RMAppAttemptEventType.KILL event
final Dispatcher dispatcher = new AsyncDispatcher() {
@Override
public EventHandler getEventHandler() {
class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
@Override
public boolean matches(Object argument) {
if (argument instanceof RMAppAttemptEvent) {
if (((RMAppAttemptEvent) argument).getType().equals(
RMAppAttemptEventType.KILL)) {
return true;
}
}
return false;
}
}
EventHandler handler = spy(super.getEventHandler());
doNothing().when(handler).handle(argThat(new EventArgMatcher()));
return handler;
}
};
MockRM rm1 = new MockRM(conf){
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
rm1.killApp(app1.getApplicationId());
// fail the app by sending container_finished event.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
// app is killed, not launching a new attempt
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
}
} }

View File

@ -726,12 +726,6 @@ public class TestRMAppTransitions {
application.handle(event); application.handle(event);
rmDispatcher.await(); rmDispatcher.await();
// Ignore Attempt_Finished if we were supposed to go to Finished.
assertAppState(RMAppState.KILLING, application);
RMAppEvent finishEvent =
new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
application.handle(finishEvent);
assertAppState(RMAppState.KILLING, application);
sendAttemptUpdateSavedEvent(application); sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application); sendAppUpdateSavedEvent(application);
assertKilled(application); assertKilled(application);