From 5e8a9822090dfb45d93d9cbcf82f8d41e689b995 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Fri, 1 Jun 2012 21:56:04 +0000 Subject: [PATCH] svn merge -c 1345362. FIXES: MAPREDUCE-4302. NM goes down if error encountered during log aggregation (Daryn Sharp via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1345363 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../ContainerManagerImpl.java | 5 +- .../application/ApplicationEventType.java | 3 +- .../application/ApplicationFinishEvent.java | 46 ++++ .../application/ApplicationImpl.java | 34 ++- .../logaggregation/LogAggregationService.java | 25 ++- .../loghandler/NonAggregatingLogHandler.java | 4 + .../TestLogAggregationService.java | 211 ++++++++++++++---- 8 files changed, 278 insertions(+), 53 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4ba268ea9b1..4cd444086c5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -434,6 +434,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4297. Usersmap file in gridmix should not fail on empty lines (Ravi Prakash via bobby) + MAPREDUCE-4302. NM goes down if error encountered during log aggregation + (Daryn Sharp via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index d82186f17c3..4e0e0b42ce1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -524,8 +525,8 @@ public class ContainerManagerImpl extends CompositeService implements (CMgrCompletedAppsEvent) event; for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) { this.dispatcher.getEventHandler().handle( - new ApplicationEvent(appID, - ApplicationEventType.FINISH_APPLICATION)); + new ApplicationFinishEvent(appID, + "Application Killed by ResourceManager")); } break; case FINISH_CONTAINERS: diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java index f988a3e435b..24c9a132b0a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java @@ -23,7 +23,7 @@ public enum ApplicationEventType { // Source: ContainerManager INIT_APPLICATION, INIT_CONTAINER, - FINISH_APPLICATION, + FINISH_APPLICATION, // Source: LogAggregationService if init fails // Source: ResourceLocalizationService APPLICATION_INITED, @@ -33,5 +33,6 @@ public enum ApplicationEventType { APPLICATION_CONTAINER_FINISHED, // Source: Log Handler + APPLICATION_LOG_HANDLING_INITED, APPLICATION_LOG_HANDLING_FINISHED } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java new file mode 100644 index 00000000000..0fbdca8e3f2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * Finish/abort event + */ +public class ApplicationFinishEvent extends ApplicationEvent { + private final String diagnostic; + + /** + * Application event to abort all containers associated with the app + * @param appId to abort containers + * @param diagnostic reason for the abort + */ + public ApplicationFinishEvent(ApplicationId appId, String diagnostic) { + super(appId, ApplicationEventType.FINISH_APPLICATION); + this.diagnostic = diagnostic; + } + + /** + * Why the app was aborted + * @return diagnostic message + */ + public String getDiagnostic() { + return diagnostic; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index d1bcaf2ef9d..2c61b70a076 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -141,6 +141,9 @@ public class ApplicationImpl implements Application { ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationEventType.FINISH_APPLICATION, new AppFinishTriggeredTransition()) + .addTransition(ApplicationState.INITING, ApplicationState.INITING, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED, + new AppLogInitDoneTransition()) .addTransition(ApplicationState.INITING, ApplicationState.RUNNING, ApplicationEventType.APPLICATION_INITED, new AppInitDoneTransition()) @@ -192,8 +195,7 @@ public class ApplicationImpl implements Application { /** * Notify services of new application. * - * In particular, this requests that the {@link ResourceLocalizationService} - * localize the application-scoped resources. + * In particular, this initializes the {@link LogAggregationService} */ @SuppressWarnings("unchecked") static class AppInitTransition implements @@ -203,6 +205,27 @@ public class ApplicationImpl implements Application { ApplicationInitEvent initEvent = (ApplicationInitEvent)event; app.applicationACLs = initEvent.getApplicationACLs(); app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); + // Inform the logAggregator + app.dispatcher.getEventHandler().handle( + new LogHandlerAppStartedEvent(app.appId, app.user, + app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, + app.applicationACLs)); + } + } + + /** + * Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after + * {@link LogAggregationService} has created the directories for the app + * and started the aggregation thread for the app. + * + * In particular, this requests that the {@link ResourceLocalizationService} + * localize the application-scoped resources. + */ + @SuppressWarnings("unchecked") + static class AppLogInitDoneTransition implements + SingleArcTransition { + @Override + public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); @@ -248,13 +271,6 @@ public class ApplicationImpl implements Application { SingleArcTransition { @Override public void transition(ApplicationImpl app, ApplicationEvent event) { - - // Inform the logAggregator - app.dispatcher.getEventHandler().handle( - new LogHandlerAppStartedEvent(app.appId, app.user, - app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, - app.applicationACLs)); - // Start all the containers waiting for ApplicationInit for (Container container : app.containers.values()) { app.dispatcher.getEventHandler().handle(new ContainerInitEvent( diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index f7cf6c53b9a..d00c61ed50e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -49,6 +49,9 @@ import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LogAggregationService extends AbstractService implements @@ -146,13 +150,13 @@ public class LogAggregationService extends AbstractService implements try { remoteFS = FileSystem.get(conf); } catch (IOException e) { - throw new YarnException("Unable to get Remote FileSystem isntance", e); + throw new YarnException("Unable to get Remote FileSystem instance", e); } boolean remoteExists = false; try { remoteExists = remoteFS.exists(this.remoteRootLogDir); } catch (IOException e) { - throw new YarnException("Failed to check for existance of remoteLogDir [" + throw new YarnException("Failed to check for existence of remoteLogDir [" + this.remoteRootLogDir + "]"); } if (remoteExists) { @@ -266,9 +270,26 @@ public class LogAggregationService extends AbstractService implements } } + @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Map appAcls) { + ApplicationEvent eventResponse; + try { + initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); + eventResponse = new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); + } catch (YarnException e) { + eventResponse = new ApplicationFinishEvent(appId, + "Application failed to init aggregation: " + e.getMessage()); + } + this.dispatcher.getEventHandler().handle(eventResponse); + } + + @VisibleForTesting + public void initAppAggregator(final ApplicationId appId, String user, + Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, + Map appAcls) { // Get user's FileSystem credentials UserGroupInformation userUgi = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index a90912e6885..3d5ad68cfdb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -93,6 +93,7 @@ public class NonAggregatingLogHandler extends AbstractService implements super.stop(); } + @SuppressWarnings("unchecked") @Override public void handle(LogHandlerEvent event) { switch (event.getType()) { @@ -101,6 +102,9 @@ public class NonAggregatingLogHandler extends AbstractService implements (LogHandlerAppStartedEvent) event; this.appOwners.put(appStartedEvent.getApplicationId(), appStartedEvent.getUser()); + this.dispatcher.getEventHandler().handle( + new ApplicationEvent(appStartedEvent.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)); break; case CONTAINER_FINISHED: // Ignore diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index ab403357034..2d300310a28 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -18,10 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; @@ -32,6 +29,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -47,6 +45,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mortbay.util.MultiException; //@Ignore @@ -112,7 +115,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { @Test @SuppressWarnings("unchecked") - public void testLocalFileDeletionAfterUpload() throws IOException { + public void testLocalFileDeletionAfterUpload() throws Exception { this.delSrvc = new DeletionService(createContainerExecutor()); this.delSrvc.init(conf); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); @@ -170,19 +173,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logFilePath.toUri().getPath()).exists()); dispatcher.await(); - ArgumentCaptor eventCaptor = - ArgumentCaptor.forClass(ApplicationEvent.class); - verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, - eventCaptor.getValue().getType()); - assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue() - .getApplicationID()); + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + + checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID"); + dispatcher.stop(); } @Test @SuppressWarnings("unchecked") - public void testNoContainerOnNode() { + public void testNoContainerOnNode() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); @@ -218,19 +225,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest { .exists()); dispatcher.await(); - ArgumentCaptor eventCaptor = - ArgumentCaptor.forClass(ApplicationEvent.class); - verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, - eventCaptor.getValue().getType()); - verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(application1, eventCaptor.getValue() - .getApplicationID()); + + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID"); + dispatcher.stop(); } @Test @SuppressWarnings("unchecked") - public void testMultipleAppsLogAggregation() throws IOException { + public void testMultipleAppsLogAggregation() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, @@ -299,9 +309,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest { app3LogDir.mkdir(); logAggregationService.handle(new LogHandlerAppStartedEvent(application3, this.user, null, - ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); - + ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); + ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent( + application3, + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) + }; + checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID"); + reset(appEventHandler); + ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container31); logAggregationService.handle( @@ -339,22 +362,59 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new ContainerId[] { container31, container32 }); dispatcher.await(); - ArgumentCaptor eventCaptor = - ArgumentCaptor.forClass(ApplicationEvent.class); - - verify(appEventHandler, times(3)).handle(eventCaptor.capture()); - List capturedEvents = eventCaptor.getAllValues(); - Set appIds = new HashSet(); - for (ApplicationEvent cap : capturedEvents) { - assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, - eventCaptor.getValue().getType()); - appIds.add(cap.getApplicationID()); - } - assertTrue(appIds.contains(application1)); - assertTrue(appIds.contains(application2)); - assertTrue(appIds.contains(application3)); + + ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[]{ + new ApplicationEvent( + application1, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), + new ApplicationEvent( + application2, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), + new ApplicationEvent( + application3, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) + }; + checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID"); + dispatcher.stop(); } - + + @Test + @SuppressWarnings("unchecked") + public void testLogAggregationInitFailsWithoutKillingNM() throws Exception { + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = spy( + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler)); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId appId = BuilderUtils.newApplicationId( + System.currentTimeMillis(), (int)Math.random()); + doThrow(new YarnException("KABOOM!")) + .when(logAggregationService).initAppAggregator( + eq(appId), eq(user), any(Credentials.class), + any(ContainerLogsRetentionPolicy.class), anyMap()); + + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, + this.user, null, + ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); + + dispatcher.await(); + ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ + new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!") + }; + checkEvents(appEventHandler, expectedEvents, false, + "getType", "getApplicationID", "getDiagnostic"); + } + private void writeContainerLogs(File appLogDir, ContainerId containerId) throws IOException { // ContainerLogDir should be created @@ -599,4 +659,77 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Assert.assertEquals("Log aggregator failed to cleanup!", 0, logAggregationService.getNumAggregators()); } + + @SuppressWarnings("unchecked") + private static > + void checkEvents(EventHandler eventHandler, + T expectedEvents[], boolean inOrder, + String... methods) throws Exception { + Class genericClass = (Class)expectedEvents.getClass().getComponentType(); + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(genericClass); + // captor work work unless used via a verify + verify(eventHandler, atLeast(0)).handle(eventCaptor.capture()); + List actualEvents = eventCaptor.getAllValues(); + + // batch up exceptions so junit presents them as one + MultiException failures = new MultiException(); + try { + assertEquals("expected events", expectedEvents.length, actualEvents.size()); + } catch (Throwable e) { + failures.add(e); + } + if (inOrder) { + // sequentially verify the events + int len = Math.max(expectedEvents.length, actualEvents.size()); + for (int n=0; n < len; n++) { + try { + String expect = (n < expectedEvents.length) + ? eventToString(expectedEvents[n], methods) : null; + String actual = (n < actualEvents.size()) + ? eventToString(actualEvents.get(n), methods) : null; + assertEquals("event#"+n, expect, actual); + } catch (Throwable e) { + failures.add(e); + } + } + } else { + // verify the actual events were expected + // verify no expected events were not seen + Set expectedSet = new HashSet(); + for (T expectedEvent : expectedEvents) { + expectedSet.add(eventToString(expectedEvent, methods)); + } + for (T actualEvent : actualEvents) { + try { + String actual = eventToString(actualEvent, methods); + assertTrue("unexpected event: "+actual, expectedSet.remove(actual)); + } catch (Throwable e) { + failures.add(e); + } + } + for (String expected : expectedSet) { + try { + Assert.fail("missing event: "+expected); + } catch (Throwable e) { + failures.add(e); + } + } + } + failures.ifExceptionThrow(); + } + + private static String eventToString(Event event, String[] methods) throws Exception { + StringBuilder sb = new StringBuilder("[ "); + for (String m : methods) { + try { + Method method = event.getClass().getMethod(m); + String value = method.invoke(event).toString(); + sb.append(method.getName()).append("=").append(value).append(" "); + } catch (Exception e) { + // ignore, actual event may not implement the method... + } + } + sb.append("]"); + return sb.toString(); + } }