From 575c05635704868c5264a3b0e5bd203cca395d0f Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 16 May 2016 15:40:23 +0000 Subject: [PATCH] YARN-4325. Nodemanager log handlers fail to send finished/failed events in some cases. Contributed by Junping Du (cherry picked from commit 81effb7dcde2b31423438d6f1b8b8204d4ca05b3) --- .../application/ApplicationImpl.java | 8 +- .../logaggregation/AppLogAggregatorImpl.java | 4 + .../logaggregation/LogAggregationService.java | 5 +- .../loghandler/NonAggregatingLogHandler.java | 4 + .../TestContainerManagerRecovery.java | 84 +++++++++++++++++++ .../TestNonAggregatingLogHandler.java | 58 ++++++++++++- 6 files changed, 155 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index fbc8453b3cc..efa258a467c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -207,18 +207,18 @@ public class ApplicationImpl implements Application { ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, ApplicationEventType.APPLICATION_INITED, ApplicationEventType.FINISH_APPLICATION)) - + // Transitions from FINISHED state .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED, - ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, + EnumSet.of( + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), new AppLogsAggregatedTransition()) .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED, EnumSet.of( ApplicationEventType.APPLICATION_LOG_HANDLING_INITED, - ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED, ApplicationEventType.FINISH_APPLICATION)) - // create the topology tables .installTopology(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index fed4a3bab65..32b0934807c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -501,6 +501,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } + @SuppressWarnings("unchecked") @Override public void run() { try { @@ -513,6 +514,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } finally { if (!this.appAggregationFinished.get()) { LOG.warn("Aggregation did not complete for application " + appId); + this.dispatcher.getEventHandler().handle( + new ApplicationEvent(this.appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); } this.appAggregationFinished.set(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index d46f7a3291b..189e7ff9dc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -416,7 +416,6 @@ public class LogAggregationService extends AbstractService implements // A container is complete. Put this containers' logs up for aggregation if // this containers' logs are needed. - AppLogAggregator aggregator = this.appLogAggregators.get( containerId.getApplicationAttemptId().getApplicationId()); if (aggregator == null) { @@ -436,6 +435,7 @@ public class LogAggregationService extends AbstractService implements new ContainerLogContext(containerId, containerType, exitCode)); } + @SuppressWarnings("unchecked") private void stopApp(ApplicationId appId) { // App is complete. Finish up any containers' pending log aggregation and @@ -445,6 +445,9 @@ public class LogAggregationService extends AbstractService implements if (aggregator == null) { LOG.warn("Log aggregation is not initialized for " + appId + ", did it fail to start?"); + this.dispatcher.getEventHandler().handle( + new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); return; } aggregator.finishLogAggregation(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index d42a4e7ce2e..290174352fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -171,6 +171,10 @@ public class NonAggregatingLogHandler extends AbstractService implements String user = appOwners.remove(appId); if (user == null) { LOG.error("Unable to locate user for " + appId); + // send LOG_HANDLING_FAILED out + NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle( + new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); break; } LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 9fa3fcc13c8..762a99a9289 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -294,6 +294,90 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest { cm.stop(); } + @Test + public void testNMRecoveryForAppFinishedWithLogAggregationFailure() + throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context); + cm.init(conf); + cm.start(); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map localResources = Collections.emptyMap(); + Map containerEnv = Collections.emptyMap(); + List containerCmds = Collections.emptyList(); + Map serviceData = Collections.emptyMap(); + + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, containerCmds, serviceData, + null, null); + + StartContainersResponse startResponse = startContainer(context, cm, cid, + clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + waitForAppState(app, ApplicationState.INITING); + + // simulate application completion + List finishedApps = new ArrayList(); + finishedApps.add(appId); + cm.handle(new CMgrCompletedAppsEvent(finishedApps, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP); + + app.handle(new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); + assertEquals(app.getApplicationState(), ApplicationState.FINISHED); + // application is still in NM context. + assertEquals(1, context.getApplications().size()); + + // restart and verify app is still there and marked as finished. + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + + // no longer saving FINISH_APP event in NM stateStore, + // simulate by resending FINISH_APP event + cm.handle(new CMgrCompletedAppsEvent(finishedApps, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP); + // TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP + // is needed. + app.handle(new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); + assertEquals(app.getApplicationState(), ApplicationState.FINISHED); + + // simulate log aggregation failed. + app.handle(new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)); + + // restart and verify app is no longer present after recovery + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertTrue(context.getApplications().isEmpty()); + cm.stop(); + } + @Test public void testContainerResizeRecovery() throws Exception { conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index 46d06daf247..ec3757e6806 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -84,7 +87,7 @@ public class TestNonAggregatingLogHandler { DeletionService mockDelService; Configuration conf; DrainDispatcher dispatcher; - EventHandler appEventHandler; + private ApplicationEventHandler appEventHandler; String user = "testuser"; ApplicationId appId; ApplicationAttemptId appAttemptId; @@ -97,7 +100,7 @@ public class TestNonAggregatingLogHandler { mockDelService = mock(DeletionService.class); conf = new YarnConfiguration(); dispatcher = createDispatcher(conf); - appEventHandler = mock(EventHandler.class); + appEventHandler = new ApplicationEventHandler(); dispatcher.register(ApplicationEventType.class, appEventHandler); appId = BuilderUtils.newApplicationId(1234, 1); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); @@ -345,6 +348,9 @@ public class TestNonAggregatingLogHandler { dirsHandler.init(conf); + appEventHandler.resetLogHandlingEvent(); + assertFalse(appEventHandler.receiveLogHandlingFinishEvent()); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); stateStore.init(conf); stateStore.start(); @@ -377,8 +383,21 @@ public class TestNonAggregatingLogHandler { logHandler.start(); verify(logHandler.mockSched, never()).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + + // wait events get drained. + this.dispatcher.await(); + assertTrue(appEventHandler.receiveLogHandlingFinishEvent()); + + appEventHandler.resetLogHandlingEvent(); + assertFalse(appEventHandler.receiveLogHandlingFailedEvent()); + // send an app finish event against a removed app + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); + this.dispatcher.await(); + // verify to receive a log failed event. + assertTrue(appEventHandler.receiveLogHandlingFailedEvent()); + assertFalse(appEventHandler.receiveLogHandlingFinishEvent()); logHandler.close(); - } + } /** * Function to run a log handler with directories failing the getFileStatus @@ -536,4 +555,37 @@ public class TestNonAggregatingLogHandler { } return dirs; } + + class ApplicationEventHandler implements EventHandler { + + private boolean logHandlingFinished = false; + private boolean logHandlingFailed = false; + + @Override + public void handle(ApplicationEvent event) { + switch (event.getType()) { + case APPLICATION_LOG_HANDLING_FINISHED: + logHandlingFinished = true; + break; + case APPLICATION_LOG_HANDLING_FAILED: + logHandlingFailed = true; + default: + // do nothing. + } + } + + public boolean receiveLogHandlingFinishEvent() { + return logHandlingFinished; + } + + public boolean receiveLogHandlingFailedEvent() { + return logHandlingFailed; + } + + public void resetLogHandlingEvent() { + logHandlingFinished = false; + logHandlingFailed = false; + } + } + }