MAPREDUCE-4302. NM goes down if error encountered during log aggregation (Daryn Sharp via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1345362 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca90e296ad
commit
1cf6010675
|
@ -550,6 +550,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-4297. Usersmap file in gridmix should not fail on empty lines
|
||||
(Ravi Prakash via bobby)
|
||||
|
||||
MAPREDUCE-4302. NM goes down if error encountered during log aggregation
|
||||
(Daryn Sharp via bobby)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
@ -524,8 +525,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
(CMgrCompletedAppsEvent) event;
|
||||
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ApplicationEvent(appID,
|
||||
ApplicationEventType.FINISH_APPLICATION));
|
||||
new ApplicationFinishEvent(appID,
|
||||
"Application Killed by ResourceManager"));
|
||||
}
|
||||
break;
|
||||
case FINISH_CONTAINERS:
|
||||
|
|
|
@ -23,7 +23,7 @@ public enum ApplicationEventType {
|
|||
// Source: ContainerManager
|
||||
INIT_APPLICATION,
|
||||
INIT_CONTAINER,
|
||||
FINISH_APPLICATION,
|
||||
FINISH_APPLICATION, // Source: LogAggregationService if init fails
|
||||
|
||||
// Source: ResourceLocalizationService
|
||||
APPLICATION_INITED,
|
||||
|
@ -33,5 +33,6 @@ public enum ApplicationEventType {
|
|||
APPLICATION_CONTAINER_FINISHED,
|
||||
|
||||
// Source: Log Handler
|
||||
APPLICATION_LOG_HANDLING_INITED,
|
||||
APPLICATION_LOG_HANDLING_FINISHED
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
/**
|
||||
* Finish/abort event
|
||||
*/
|
||||
public class ApplicationFinishEvent extends ApplicationEvent {
|
||||
private final String diagnostic;
|
||||
|
||||
/**
|
||||
* Application event to abort all containers associated with the app
|
||||
* @param appId to abort containers
|
||||
* @param diagnostic reason for the abort
|
||||
*/
|
||||
public ApplicationFinishEvent(ApplicationId appId, String diagnostic) {
|
||||
super(appId, ApplicationEventType.FINISH_APPLICATION);
|
||||
this.diagnostic = diagnostic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Why the app was aborted
|
||||
* @return diagnostic message
|
||||
*/
|
||||
public String getDiagnostic() {
|
||||
return diagnostic;
|
||||
}
|
||||
}
|
|
@ -141,6 +141,9 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
|
||||
ApplicationEventType.FINISH_APPLICATION,
|
||||
new AppFinishTriggeredTransition())
|
||||
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
|
||||
new AppLogInitDoneTransition())
|
||||
.addTransition(ApplicationState.INITING, ApplicationState.RUNNING,
|
||||
ApplicationEventType.APPLICATION_INITED,
|
||||
new AppInitDoneTransition())
|
||||
|
@ -192,8 +195,7 @@ public class ApplicationImpl implements Application {
|
|||
/**
|
||||
* Notify services of new application.
|
||||
*
|
||||
* In particular, this requests that the {@link ResourceLocalizationService}
|
||||
* localize the application-scoped resources.
|
||||
* In particular, this initializes the {@link LogAggregationService}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
static class AppInitTransition implements
|
||||
|
@ -203,6 +205,27 @@ public class ApplicationImpl implements Application {
|
|||
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
|
||||
app.applicationACLs = initEvent.getApplicationACLs();
|
||||
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
|
||||
// Inform the logAggregator
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new LogHandlerAppStartedEvent(app.appId, app.user,
|
||||
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
|
||||
app.applicationACLs));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after
|
||||
* {@link LogAggregationService} has created the directories for the app
|
||||
* and started the aggregation thread for the app.
|
||||
*
|
||||
* In particular, this requests that the {@link ResourceLocalizationService}
|
||||
* localize the application-scoped resources.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
static class AppLogInitDoneTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new ApplicationLocalizationEvent(
|
||||
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
||||
|
@ -248,13 +271,6 @@ public class ApplicationImpl implements Application {
|
|||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
|
||||
// Inform the logAggregator
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new LogHandlerAppStartedEvent(app.appId, app.user,
|
||||
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
|
||||
app.applicationACLs));
|
||||
|
||||
// Start all the containers waiting for ApplicationInit
|
||||
for (Container container : app.containers.values()) {
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
|
|
|
@ -49,6 +49,9 @@ import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||
|
@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
public class LogAggregationService extends AbstractService implements
|
||||
|
@ -146,13 +150,13 @@ public class LogAggregationService extends AbstractService implements
|
|||
try {
|
||||
remoteFS = FileSystem.get(conf);
|
||||
} catch (IOException e) {
|
||||
throw new YarnException("Unable to get Remote FileSystem isntance", e);
|
||||
throw new YarnException("Unable to get Remote FileSystem instance", e);
|
||||
}
|
||||
boolean remoteExists = false;
|
||||
try {
|
||||
remoteExists = remoteFS.exists(this.remoteRootLogDir);
|
||||
} catch (IOException e) {
|
||||
throw new YarnException("Failed to check for existance of remoteLogDir ["
|
||||
throw new YarnException("Failed to check for existence of remoteLogDir ["
|
||||
+ this.remoteRootLogDir + "]");
|
||||
}
|
||||
if (remoteExists) {
|
||||
|
@ -266,9 +270,26 @@ public class LogAggregationService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void initApp(final ApplicationId appId, String user,
|
||||
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
|
||||
Map<ApplicationAccessType, String> appAcls) {
|
||||
ApplicationEvent eventResponse;
|
||||
try {
|
||||
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
|
||||
eventResponse = new ApplicationEvent(appId,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
|
||||
} catch (YarnException e) {
|
||||
eventResponse = new ApplicationFinishEvent(appId,
|
||||
"Application failed to init aggregation: " + e.getMessage());
|
||||
}
|
||||
this.dispatcher.getEventHandler().handle(eventResponse);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void initAppAggregator(final ApplicationId appId, String user,
|
||||
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
|
||||
Map<ApplicationAccessType, String> appAcls) {
|
||||
|
||||
// Get user's FileSystem credentials
|
||||
UserGroupInformation userUgi =
|
||||
|
|
|
@ -93,6 +93,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
|
|||
super.stop();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void handle(LogHandlerEvent event) {
|
||||
switch (event.getType()) {
|
||||
|
@ -101,6 +102,9 @@ public class NonAggregatingLogHandler extends AbstractService implements
|
|||
(LogHandlerAppStartedEvent) event;
|
||||
this.appOwners.put(appStartedEvent.getApplicationId(),
|
||||
appStartedEvent.getUser());
|
||||
this.dispatcher.getEventHandler().handle(
|
||||
new ApplicationEvent(appStartedEvent.getApplicationId(),
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED));
|
||||
break;
|
||||
case CONTAINER_FINISHED:
|
||||
// Ignore
|
||||
|
|
|
@ -18,10 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static junit.framework.Assert.assertEquals;
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
|
||||
|
@ -32,6 +29,7 @@ import java.io.FileWriter;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.Writer;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
@ -47,6 +45,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||
|
@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
|||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mortbay.util.MultiException;
|
||||
|
||||
|
||||
//@Ignore
|
||||
|
@ -112,7 +115,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testLocalFileDeletionAfterUpload() throws IOException {
|
||||
public void testLocalFileDeletionAfterUpload() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
|
@ -170,19 +173,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
logFilePath.toUri().getPath()).exists());
|
||||
|
||||
dispatcher.await();
|
||||
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
||||
ArgumentCaptor.forClass(ApplicationEvent.class);
|
||||
verify(appEventHandler).handle(eventCaptor.capture());
|
||||
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||
eventCaptor.getValue().getType());
|
||||
assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
|
||||
.getApplicationID());
|
||||
|
||||
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
|
||||
new ApplicationEvent(
|
||||
appAttemptId.getApplicationId(),
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
|
||||
new ApplicationEvent(
|
||||
appAttemptId.getApplicationId(),
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
|
||||
};
|
||||
|
||||
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
|
||||
dispatcher.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNoContainerOnNode() {
|
||||
public void testNoContainerOnNode() throws Exception {
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
|
@ -218,19 +225,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
.exists());
|
||||
|
||||
dispatcher.await();
|
||||
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
||||
ArgumentCaptor.forClass(ApplicationEvent.class);
|
||||
verify(appEventHandler).handle(eventCaptor.capture());
|
||||
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||
eventCaptor.getValue().getType());
|
||||
verify(appEventHandler).handle(eventCaptor.capture());
|
||||
assertEquals(application1, eventCaptor.getValue()
|
||||
.getApplicationID());
|
||||
|
||||
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
|
||||
new ApplicationEvent(
|
||||
application1,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
|
||||
new ApplicationEvent(
|
||||
application1,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
|
||||
};
|
||||
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
|
||||
dispatcher.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testMultipleAppsLogAggregation() throws IOException {
|
||||
public void testMultipleAppsLogAggregation() throws Exception {
|
||||
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
|
@ -299,9 +309,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
app3LogDir.mkdir();
|
||||
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
|
||||
this.user, null,
|
||||
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
||||
|
||||
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
||||
|
||||
ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
|
||||
new ApplicationEvent(
|
||||
application1,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
|
||||
new ApplicationEvent(
|
||||
application2,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
|
||||
new ApplicationEvent(
|
||||
application3,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
|
||||
};
|
||||
checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID");
|
||||
reset(appEventHandler);
|
||||
|
||||
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
|
||||
writeContainerLogs(app3LogDir, container31);
|
||||
logAggregationService.handle(
|
||||
|
@ -339,22 +362,59 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
new ContainerId[] { container31, container32 });
|
||||
|
||||
dispatcher.await();
|
||||
ArgumentCaptor<ApplicationEvent> eventCaptor =
|
||||
ArgumentCaptor.forClass(ApplicationEvent.class);
|
||||
|
||||
verify(appEventHandler, times(3)).handle(eventCaptor.capture());
|
||||
List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
|
||||
Set<ApplicationId> appIds = new HashSet<ApplicationId>();
|
||||
for (ApplicationEvent cap : capturedEvents) {
|
||||
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
|
||||
eventCaptor.getValue().getType());
|
||||
appIds.add(cap.getApplicationID());
|
||||
}
|
||||
assertTrue(appIds.contains(application1));
|
||||
assertTrue(appIds.contains(application2));
|
||||
assertTrue(appIds.contains(application3));
|
||||
|
||||
ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[]{
|
||||
new ApplicationEvent(
|
||||
application1,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
|
||||
new ApplicationEvent(
|
||||
application2,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
|
||||
new ApplicationEvent(
|
||||
application3,
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
|
||||
};
|
||||
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
|
||||
dispatcher.stop();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
|
||||
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
|
||||
DrainDispatcher dispatcher = createDispatcher();
|
||||
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
||||
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
super.dirsHandler));
|
||||
logAggregationService.init(this.conf);
|
||||
logAggregationService.start();
|
||||
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(
|
||||
System.currentTimeMillis(), (int)Math.random());
|
||||
doThrow(new YarnException("KABOOM!"))
|
||||
.when(logAggregationService).initAppAggregator(
|
||||
eq(appId), eq(user), any(Credentials.class),
|
||||
any(ContainerLogsRetentionPolicy.class), anyMap());
|
||||
|
||||
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
|
||||
this.user, null,
|
||||
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
||||
|
||||
dispatcher.await();
|
||||
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
|
||||
new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!")
|
||||
};
|
||||
checkEvents(appEventHandler, expectedEvents, false,
|
||||
"getType", "getApplicationID", "getDiagnostic");
|
||||
}
|
||||
|
||||
private void writeContainerLogs(File appLogDir, ContainerId containerId)
|
||||
throws IOException {
|
||||
// ContainerLogDir should be created
|
||||
|
@ -599,4 +659,77 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
Assert.assertEquals("Log aggregator failed to cleanup!", 0,
|
||||
logAggregationService.getNumAggregators());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T extends Event<?>>
|
||||
void checkEvents(EventHandler<T> eventHandler,
|
||||
T expectedEvents[], boolean inOrder,
|
||||
String... methods) throws Exception {
|
||||
Class<T> genericClass = (Class<T>)expectedEvents.getClass().getComponentType();
|
||||
ArgumentCaptor<T> eventCaptor = ArgumentCaptor.forClass(genericClass);
|
||||
// captor work work unless used via a verify
|
||||
verify(eventHandler, atLeast(0)).handle(eventCaptor.capture());
|
||||
List<T> actualEvents = eventCaptor.getAllValues();
|
||||
|
||||
// batch up exceptions so junit presents them as one
|
||||
MultiException failures = new MultiException();
|
||||
try {
|
||||
assertEquals("expected events", expectedEvents.length, actualEvents.size());
|
||||
} catch (Throwable e) {
|
||||
failures.add(e);
|
||||
}
|
||||
if (inOrder) {
|
||||
// sequentially verify the events
|
||||
int len = Math.max(expectedEvents.length, actualEvents.size());
|
||||
for (int n=0; n < len; n++) {
|
||||
try {
|
||||
String expect = (n < expectedEvents.length)
|
||||
? eventToString(expectedEvents[n], methods) : null;
|
||||
String actual = (n < actualEvents.size())
|
||||
? eventToString(actualEvents.get(n), methods) : null;
|
||||
assertEquals("event#"+n, expect, actual);
|
||||
} catch (Throwable e) {
|
||||
failures.add(e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// verify the actual events were expected
|
||||
// verify no expected events were not seen
|
||||
Set<String> expectedSet = new HashSet<String>();
|
||||
for (T expectedEvent : expectedEvents) {
|
||||
expectedSet.add(eventToString(expectedEvent, methods));
|
||||
}
|
||||
for (T actualEvent : actualEvents) {
|
||||
try {
|
||||
String actual = eventToString(actualEvent, methods);
|
||||
assertTrue("unexpected event: "+actual, expectedSet.remove(actual));
|
||||
} catch (Throwable e) {
|
||||
failures.add(e);
|
||||
}
|
||||
}
|
||||
for (String expected : expectedSet) {
|
||||
try {
|
||||
Assert.fail("missing event: "+expected);
|
||||
} catch (Throwable e) {
|
||||
failures.add(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
failures.ifExceptionThrow();
|
||||
}
|
||||
|
||||
private static String eventToString(Event<?> event, String[] methods) throws Exception {
|
||||
StringBuilder sb = new StringBuilder("[ ");
|
||||
for (String m : methods) {
|
||||
try {
|
||||
Method method = event.getClass().getMethod(m);
|
||||
String value = method.invoke(event).toString();
|
||||
sb.append(method.getName()).append("=").append(value).append(" ");
|
||||
} catch (Exception e) {
|
||||
// ignore, actual event may not implement the method...
|
||||
}
|
||||
}
|
||||
sb.append("]");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue