From 8eb39f775c09b255986a71eb375c13c0bbc74f6b Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Fri, 25 Aug 2017 15:37:54 -0500 Subject: [PATCH] YARN-7087. NM failed to perform log aggregation due to absent container. Contributed by Jason Lowe. (cherry picked from commit e864f81471407a384395fefe1ceb3b66fc7f87f2) --- .../container/ContainerImpl.java | 2 +- .../logaggregation/LogAggregationService.java | 13 +-- .../LogHandlerContainerFinishedEvent.java | 9 +- .../TestLogAggregationService.java | 105 +++++++++--------- .../TestNonAggregatingLogHandler.java | 10 +- 5 files changed, 69 insertions(+), 70 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 9d1d3c77d7a..7dc8ab6b059 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -495,7 +495,7 @@ private void sendFinishedEvents() { eventHandler.handle(new ContainerStopMonitoringEvent(containerId)); // Tell the logService too eventHandler.handle(new LogHandlerContainerFinishedEvent( - containerId, exitCode)); + containerId, containerTokenIdentifier.getContainerType(), exitCode)); } @SuppressWarnings("unchecked") // dispatcher not typed diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index de89a968478..5bf9dbfd20f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -426,7 +425,8 @@ int getNumAggregators() { return this.appLogAggregators.size(); } - private void stopContainer(ContainerId containerId, int exitCode) { + private void stopContainer(ContainerId containerId, + ContainerType containerType, int exitCode) { // A container is complete. Put this containers' logs up for aggregation if // this containers' logs are needed. @@ -437,14 +437,6 @@ private void stopContainer(ContainerId containerId, int exitCode) { + ", did it fail to start?"); return; } - Container container = context.getContainers().get(containerId); - if (null == container) { - LOG.warn("Log aggregation cannot be started for " + containerId - + ", as its an absent container"); - return; - } - ContainerType containerType = - container.getContainerTokenIdentifier().getContainerType(); aggregator.startContainerLogAggregation( new ContainerLogContext(containerId, containerType, exitCode)); } @@ -482,6 +474,7 @@ public void handle(LogHandlerEvent event) { LogHandlerContainerFinishedEvent containerFinishEvent = (LogHandlerContainerFinishedEvent) event; stopContainer(containerFinishEvent.getContainerId(), + containerFinishEvent.getContainerType(), containerFinishEvent.getExitCode()); break; case APPLICATION_FINISHED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java index 038006edcf6..3f4b6a076fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java @@ -19,16 +19,19 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.ContainerType; public class LogHandlerContainerFinishedEvent extends LogHandlerEvent { private final ContainerId containerId; + private final ContainerType containerType; private final int exitCode; public LogHandlerContainerFinishedEvent(ContainerId containerId, - int exitCode) { + ContainerType containerType, int exitCode) { super(LogHandlerEventType.CONTAINER_FINISHED); this.containerId = containerId; + this.containerType = containerType; this.exitCode = exitCode; } @@ -36,6 +39,10 @@ public ContainerId getContainerId() { return this.containerId; } + public ContainerType getContainerType() { + return containerType; + } + public int getExitCode() { return this.exitCode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 0667d198804..4afe57fe775 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -133,7 +133,6 @@ import com.google.common.base.Supplier; -//@Ignore public class TestLogAggregationService extends BaseContainerManagerTest { private Map acls = createAppAcls(); @@ -199,13 +198,13 @@ private void verifyLocalFileDeletion( ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container11 = createContainer(appAttemptId, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container11 = ContainerId.newContainerId(appAttemptId, 1); // Simulate log-file creation writeContainerLogs(app1LogDir, container11, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container11, 0)); + new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); @@ -321,11 +320,11 @@ public void testNoLogsUploadedOnAppFinish() throws Exception { ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(app, 1); - ContainerId cont = createContainer(appAttemptId, 1, - ContainerType.APPLICATION_MASTER); + ContainerId cont = ContainerId.newContainerId(appAttemptId, 1); writeContainerLogs(appLogDir, cont, new String[] { "stdout", "stderr", "syslog" }); - logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0)); + logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, + ContainerType.APPLICATION_MASTER, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent(app)); logAggregationService.stop(); delSrvc.stop(); @@ -407,13 +406,13 @@ public void testMultipleAppsLogAggregation() throws Exception { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container11 = createContainer(appAttemptId1, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container11 = ContainerId.newContainerId(appAttemptId1, 1); // Simulate log-file creation writeContainerLogs(app1LogDir, container11, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container11, 0)); + new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2); ApplicationAttemptId appAttemptId2 = @@ -430,19 +429,19 @@ public void testMultipleAppsLogAggregation() throws Exception { logAggregationService.handle(new LogHandlerAppStartedEvent( application2, this.user, null, this.acls, contextWithAMOnly)); - ContainerId container21 = createContainer(appAttemptId2, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container21 = ContainerId.newContainerId(appAttemptId2, 1); writeContainerLogs(app2LogDir, container21, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container21, 0)); + new LogHandlerContainerFinishedEvent(container21, + ContainerType.APPLICATION_MASTER, 0)); - ContainerId container12 = createContainer(appAttemptId1, 2, - ContainerType.TASK); + ContainerId container12 = ContainerId.newContainerId(appAttemptId1, 2); writeContainerLogs(app1LogDir, container12, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container12, 0)); + new LogHandlerContainerFinishedEvent(container12, + ContainerType.TASK, 0)); ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3); ApplicationAttemptId appAttemptId3 = @@ -474,29 +473,29 @@ public void testMultipleAppsLogAggregation() throws Exception { checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID"); reset(appEventHandler); - ContainerId container31 = createContainer(appAttemptId3, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container31 = ContainerId.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container31, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container31, 0)); + new LogHandlerContainerFinishedEvent(container31, + ContainerType.APPLICATION_MASTER, 0)); - ContainerId container32 = createContainer(appAttemptId3, 2, - ContainerType.TASK); + ContainerId container32 = ContainerId.newContainerId(appAttemptId3, 2); writeContainerLogs(app3LogDir, container32, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container32, 1)); // Failed + new LogHandlerContainerFinishedEvent(container32, + ContainerType.TASK, 1)); // Failed - ContainerId container22 = createContainer(appAttemptId2, 2, - ContainerType.TASK); + ContainerId container22 = ContainerId.newContainerId(appAttemptId2, 2); writeContainerLogs(app2LogDir, container22, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container22, 0)); + new LogHandlerContainerFinishedEvent(container22, + ContainerType.TASK, 0)); - ContainerId container33 = createContainer(appAttemptId3, 3, - ContainerType.TASK); + ContainerId container33 = ContainerId.newContainerId(appAttemptId3, 3); writeContainerLogs(app3LogDir, container33, fileNames); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container33, 0)); + new LogHandlerContainerFinishedEvent(container33, + ContainerType.TASK, 0)); logAggregationService.handle(new LogHandlerAppFinishedEvent( application2)); @@ -750,7 +749,8 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( - BuilderUtils.newContainerId(4, 1, 1, 1), 0)); + BuilderUtils.newContainerId(4, 1, 1, 1), + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); @@ -802,7 +802,8 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM() // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM logAggregationService.handle(new LogHandlerContainerFinishedEvent( - BuilderUtils.newContainerId(4, 1, 1, 1), 0)); + BuilderUtils.newContainerId(4, 1, 1, 1), + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); @@ -1325,14 +1326,13 @@ public void testLogAggregationServiceWithPatterns() throws Exception { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId container1 = createContainer(appAttemptId1, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container1 = ContainerId.newContainerId(appAttemptId1, 1); // Simulate log-file creation writeContainerLogs(appLogDir1, container1, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle(new LogHandlerContainerFinishedEvent( - container1, 0)); + container1, ContainerType.APPLICATION_MASTER, 0)); // LogContext for application2 has excludePatten which includes // stdout and syslog. @@ -1348,13 +1348,13 @@ public void testLogAggregationServiceWithPatterns() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application2, this.user, null, this.acls, LogAggregationContextWithExcludePatterns)); - ContainerId container2 = createContainer(appAttemptId2, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container2 = ContainerId.newContainerId(appAttemptId2, 1); writeContainerLogs(app2LogDir, container2, new String[] { "stdout", "stderr", "syslog" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container2, 0)); + new LogHandlerContainerFinishedEvent(container2, + ContainerType.APPLICATION_MASTER, 0)); // LogContext for application3 has includePattern which is *.log and // excludePatten which includes std.log and sys.log. @@ -1373,12 +1373,12 @@ public void testLogAggregationServiceWithPatterns() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application3, this.user, null, this.acls, context1)); - ContainerId container3 = createContainer(appAttemptId3, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container3 = ContainerId.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container3, new String[] { "stdout", "sys.log", "std.log", "out.log", "err.log", "log" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container3, 0)); + new LogHandlerContainerFinishedEvent(container3, + ContainerType.APPLICATION_MASTER, 0)); // LogContext for application4 has includePattern // which includes std.log and sys.log and @@ -1398,12 +1398,12 @@ public void testLogAggregationServiceWithPatterns() throws Exception { AMOnlyLogAggregationPolicy.class.getName()); logAggregationService.handle(new LogHandlerAppStartedEvent(application4, this.user, null, this.acls, context2)); - ContainerId container4 = createContainer(appAttemptId4, 1, - ContainerType.APPLICATION_MASTER); + ContainerId container4 = ContainerId.newContainerId(appAttemptId4, 1); writeContainerLogs(app4LogDir, container4, new String[] { "stdout", "sys.log", "std.log", "out.log", "err.log", "log" }); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container4, 0)); + new LogHandlerContainerFinishedEvent(container4, + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); ApplicationEvent expectedInitEvents[] = @@ -1530,7 +1530,8 @@ public void testLogAggregationServiceWithPatternsAndIntervals() new ContainerId[] {container}, logFiles, 1, true); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container, 0)); + new LogHandlerContainerFinishedEvent(container, + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); @@ -1654,14 +1655,8 @@ public void testLogAggregationAbsentContainer() throws Exception { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(appId, 1); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1, 2l); - try { - logAggregationService.handle(new LogHandlerContainerFinishedEvent( - containerId, 100)); - assertTrue("Should skip when null containerID", true); - } catch (Exception e) { - Assert.assertFalse("Exception not expected should skip null containerid", - true); - } + logAggregationService.handle(new LogHandlerContainerFinishedEvent( + containerId, ContainerType.APPLICATION_MASTER, 100)); } @Test (timeout = 50000) @@ -1986,8 +1981,7 @@ private ContainerId finishContainer(ApplicationId application1, long cId, int exitCode, String[] logFiles) throws IOException { ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(application1, 1); - ContainerId containerId = createContainer(appAttemptId1, cId, - containerType); + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, cId); // Simulate log-file creation File appLogDir1 = new File(localLogDir, application1.toString()); @@ -1995,7 +1989,7 @@ private ContainerId finishContainer(ApplicationId application1, writeContainerLogs(appLogDir1, containerId, logFiles); logAggregationService.handle(new LogHandlerContainerFinishedEvent( - containerId, exitCode)); + containerId, containerType, exitCode)); return containerId; } @@ -2185,7 +2179,8 @@ private void testLogAggregationService(boolean retentionSizeLimitation) writeContainerLogs(appLogDir, container, logFiles3); logAggregationService.handle( - new LogHandlerContainerFinishedEvent(container, 0)); + new LogHandlerContainerFinishedEvent(container, + ContainerType.APPLICATION_MASTER, 0)); dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index ec3757e6806..591021f837e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -152,7 +153,8 @@ public void testLogDeletion() throws IOException { logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); - logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); @@ -192,7 +194,8 @@ public void testDelayedDelete() throws IOException { logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); - logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); @@ -361,7 +364,8 @@ public void testRecovery() throws Exception { logHandler.start(); logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); - logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); logHandler.handle(new LogHandlerAppFinishedEvent(appId)); // simulate a restart and verify deletion is rescheduled