YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user kill request races with ApplicationMaster finish. Contributed by Jian He.
(cherry picked from commit 3651fe1b08
)
Conflicts:
hadoop-yarn-project/CHANGES.txt
This commit is contained in:
parent
f8892660a8
commit
56020955fd
|
@ -348,6 +348,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
// ApplicationDoesNotExistInCacheException before and after
|
||||
// RM work-preserving restart.
|
||||
if (rmApp.isAppFinalStateStored()) {
|
||||
LOG.info(rmApp.getApplicationId() + " unregistered successfully. ");
|
||||
return FinishApplicationMasterResponse.newInstance(true);
|
||||
}
|
||||
|
||||
|
|
|
@ -293,13 +293,23 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppEventType.ATTEMPT_KILLED,
|
||||
new FinalSavingTransition(
|
||||
new AppKilledTransition(), RMAppState.KILLED))
|
||||
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
|
||||
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||
new FinalSavingTransition(
|
||||
new AttemptUnregisteredTransition(),
|
||||
RMAppState.FINISHING, RMAppState.FINISHED))
|
||||
.addTransition(RMAppState.KILLING, RMAppState.FINISHED,
|
||||
// UnManagedAM directly jumps to finished
|
||||
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
||||
.addTransition(RMAppState.KILLING,
|
||||
EnumSet.of(RMAppState.FINAL_SAVING),
|
||||
RMAppEventType.ATTEMPT_FAILED,
|
||||
new AttemptFailedTransition(RMAppState.KILLING))
|
||||
|
||||
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
||||
EnumSet.of(
|
||||
RMAppEventType.NODE_UPDATE,
|
||||
RMAppEventType.ATTEMPT_REGISTERED,
|
||||
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||
RMAppEventType.ATTEMPT_FINISHED,
|
||||
RMAppEventType.ATTEMPT_FAILED,
|
||||
RMAppEventType.APP_UPDATE_SAVED,
|
||||
RMAppEventType.KILL, RMAppEventType.MOVE))
|
||||
|
||||
|
@ -1199,6 +1209,14 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
+ app.maxAppAttempts);
|
||||
if (!app.submissionContext.getUnmanagedAM()
|
||||
&& numberOfFailure < app.maxAppAttempts) {
|
||||
if (initialState.equals(RMAppState.KILLING)) {
|
||||
// If this is not last attempt, app should be killed instead of
|
||||
// launching a new attempt
|
||||
app.rememberTargetTransitionsAndStoreState(event,
|
||||
new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
|
||||
return RMAppState.FINAL_SAVING;
|
||||
}
|
||||
|
||||
boolean transferStateFromPreviousAttempt;
|
||||
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
|
||||
transferStateFromPreviousAttempt =
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
import org.junit.Before;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -37,16 +38,19 @@ import org.junit.Assert;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
|
@ -57,6 +61,8 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
|
|||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
|
@ -73,6 +79,8 @@ import org.apache.log4j.LogManager;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class TestRM extends ParameterizedSchedulerTestBase {
|
||||
|
@ -638,4 +646,107 @@ public class TestRM extends ParameterizedSchedulerTestBase {
|
|||
Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted());
|
||||
}
|
||||
|
||||
// Test Kill an app while the app is finishing in the meanwhile.
|
||||
@Test (timeout = 30000)
|
||||
public void testKillFinishingApp() throws Exception{
|
||||
|
||||
// this dispatcher ignores RMAppAttemptEventType.KILL event
|
||||
final Dispatcher dispatcher = new AsyncDispatcher() {
|
||||
@Override
|
||||
public EventHandler getEventHandler() {
|
||||
|
||||
class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
if (argument instanceof RMAppAttemptEvent) {
|
||||
if (((RMAppAttemptEvent) argument).getType().equals(
|
||||
RMAppAttemptEventType.KILL)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
EventHandler handler = spy(super.getEventHandler());
|
||||
doNothing().when(handler).handle(argThat(new EventArgMatcher()));
|
||||
return handler;
|
||||
}
|
||||
};
|
||||
|
||||
MockRM rm1 = new MockRM(conf){
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
rm1.killApp(app1.getApplicationId());
|
||||
|
||||
FinishApplicationMasterRequest req =
|
||||
FinishApplicationMasterRequest.newInstance(
|
||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||
am1.unregisterAppAttempt(req,true);
|
||||
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.FINISHED);
|
||||
}
|
||||
|
||||
// Test Kill an app while the app is failing
|
||||
@Test (timeout = 30000)
|
||||
public void testKillFailingApp() throws Exception{
|
||||
|
||||
// this dispatcher ignores RMAppAttemptEventType.KILL event
|
||||
final Dispatcher dispatcher = new AsyncDispatcher() {
|
||||
@Override
|
||||
public EventHandler getEventHandler() {
|
||||
|
||||
class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
if (argument instanceof RMAppAttemptEvent) {
|
||||
if (((RMAppAttemptEvent) argument).getType().equals(
|
||||
RMAppAttemptEventType.KILL)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
EventHandler handler = spy(super.getEventHandler());
|
||||
doNothing().when(handler).handle(argThat(new EventArgMatcher()));
|
||||
return handler;
|
||||
}
|
||||
};
|
||||
|
||||
MockRM rm1 = new MockRM(conf){
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
rm1.killApp(app1.getApplicationId());
|
||||
|
||||
// fail the app by sending container_finished event.
|
||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||
// app is killed, not launching a new attempt
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -726,12 +726,6 @@ public class TestRMAppTransitions {
|
|||
application.handle(event);
|
||||
rmDispatcher.await();
|
||||
|
||||
// Ignore Attempt_Finished if we were supposed to go to Finished.
|
||||
assertAppState(RMAppState.KILLING, application);
|
||||
RMAppEvent finishEvent =
|
||||
new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
|
||||
application.handle(finishEvent);
|
||||
assertAppState(RMAppState.KILLING, application);
|
||||
sendAttemptUpdateSavedEvent(application);
|
||||
sendAppUpdateSavedEvent(application);
|
||||
assertKilled(application);
|
||||
|
|
Loading…
Reference in New Issue