From 56020955fde64637a3659be080f042722658f3e0 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 13 Nov 2014 08:12:41 -0800 Subject: [PATCH] YARN-2853. Fixed a bug in ResourceManager causing apps to hang when the user kill request races with ApplicationMaster finish. Contributed by Jian He. (cherry picked from commit 3651fe1b089851b38be351c00a9899817166bf3e) Conflicts: hadoop-yarn-project/CHANGES.txt --- .../ApplicationMasterService.java | 1 + .../resourcemanager/rmapp/RMAppImpl.java | 24 +++- .../yarn/server/resourcemanager/TestRM.java | 111 ++++++++++++++++++ .../rmapp/TestRMAppTransitions.java | 6 - 4 files changed, 133 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 64988f537f0..d0b199f23a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -348,6 +348,7 @@ public class ApplicationMasterService extends AbstractService implements // ApplicationDoesNotExistInCacheException before and after // RM work-preserving restart. if (rmApp.isAppFinalStateStored()) { + LOG.info(rmApp.getApplicationId() + " unregistered successfully. "); return FinishApplicationMasterResponse.newInstance(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ad92cc463a7..aeb2cda6930 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -293,13 +293,23 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( new AppKilledTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_UNREGISTERED, + new FinalSavingTransition( + new AttemptUnregisteredTransition(), + RMAppState.FINISHING, RMAppState.FINISHED)) + .addTransition(RMAppState.KILLING, RMAppState.FINISHED, + // UnManagedAM directly jumps to finished + RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.KILLING, + EnumSet.of(RMAppState.FINAL_SAVING), + RMAppEventType.ATTEMPT_FAILED, + new AttemptFailedTransition(RMAppState.KILLING)) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, EnumSet.of( RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_REGISTERED, - RMAppEventType.ATTEMPT_UNREGISTERED, - RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.ATTEMPT_FAILED, RMAppEventType.APP_UPDATE_SAVED, RMAppEventType.KILL, RMAppEventType.MOVE)) @@ -1199,6 +1209,14 @@ public class RMAppImpl implements RMApp, Recoverable { + app.maxAppAttempts); if (!app.submissionContext.getUnmanagedAM() && numberOfFailure < app.maxAppAttempts) { + if (initialState.equals(RMAppState.KILLING)) { + // If this is not last attempt, app should be killed instead of + // launching a new attempt + app.rememberTargetTransitionsAndStoreState(event, + new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED); + return RMAppState.FINAL_SAVING; + } + boolean transferStateFromPreviousAttempt; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; transferStateFromPreviousAttempt = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 4865420b936..77d8cdf5243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import org.junit.Before; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import java.util.ArrayList; @@ -37,16 +38,19 @@ import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -57,6 +61,8 @@ import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -73,6 +79,8 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @SuppressWarnings({"unchecked", "rawtypes"}) public class TestRM extends ParameterizedSchedulerTestBase { @@ -638,4 +646,107 @@ public class TestRM extends ParameterizedSchedulerTestBase { Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted()); } + // Test Kill an app while the app is finishing in the meanwhile. + @Test (timeout = 30000) + public void testKillFinishingApp() throws Exception{ + + // this dispatcher ignores RMAppAttemptEventType.KILL event + final Dispatcher dispatcher = new AsyncDispatcher() { + @Override + public EventHandler getEventHandler() { + + class EventArgMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object argument) { + if (argument instanceof RMAppAttemptEvent) { + if (((RMAppAttemptEvent) argument).getType().equals( + RMAppAttemptEventType.KILL)) { + return true; + } + } + return false; + } + } + + EventHandler handler = spy(super.getEventHandler()); + doNothing().when(handler).handle(argThat(new EventArgMatcher())); + return handler; + } + }; + + MockRM rm1 = new MockRM(conf){ + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + rm1.killApp(app1.getApplicationId()); + + FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + am1.unregisterAppAttempt(req,true); + + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING); + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED); + rm1.waitForState(app1.getApplicationId(), RMAppState.FINISHED); + } + + // Test Kill an app while the app is failing + @Test (timeout = 30000) + public void testKillFailingApp() throws Exception{ + + // this dispatcher ignores RMAppAttemptEventType.KILL event + final Dispatcher dispatcher = new AsyncDispatcher() { + @Override + public EventHandler getEventHandler() { + + class EventArgMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object argument) { + if (argument instanceof RMAppAttemptEvent) { + if (((RMAppAttemptEvent) argument).getType().equals( + RMAppAttemptEventType.KILL)) { + return true; + } + } + return false; + } + } + + EventHandler handler = spy(super.getEventHandler()); + doNothing().when(handler).handle(argThat(new EventArgMatcher())); + return handler; + } + }; + + MockRM rm1 = new MockRM(conf){ + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + rm1.killApp(app1.getApplicationId()); + + // fail the app by sending container_finished event. + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + // app is killed, not launching a new attempt + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index bbfb0ee05b0..e68d074122b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -726,12 +726,6 @@ public class TestRMAppTransitions { application.handle(event); rmDispatcher.await(); - // Ignore Attempt_Finished if we were supposed to go to Finished. - assertAppState(RMAppState.KILLING, application); - RMAppEvent finishEvent = - new RMAppFinishedAttemptEvent(application.getApplicationId(), null); - application.handle(finishEvent); - assertAppState(RMAppState.KILLING, application); sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertKilled(application);