YARN-4325. Nodemanager log handlers fail to send finished/failed events in some cases. Contributed by Junping Du

(cherry picked from commit 81effb7dcd)
This commit is contained in:
Jason Lowe 2016-05-16 15:40:23 +00:00
parent a37b3694ea
commit 575c056357
6 changed files with 155 additions and 8 deletions

View File

@ -207,18 +207,18 @@ public class ApplicationImpl implements Application {
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_INITED, ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION)) ApplicationEventType.FINISH_APPLICATION))
// Transitions from FINISHED state // Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED, .addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED, ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
new AppLogsAggregatedTransition()) new AppLogsAggregatedTransition())
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED, .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
EnumSet.of( EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.FINISH_APPLICATION)) ApplicationEventType.FINISH_APPLICATION))
// create the topology tables // create the topology tables
.installTopology(); .installTopology();

View File

@ -501,6 +501,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} }
} }
@SuppressWarnings("unchecked")
@Override @Override
public void run() { public void run() {
try { try {
@ -513,6 +514,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} finally { } finally {
if (!this.appAggregationFinished.get()) { if (!this.appAggregationFinished.get()) {
LOG.warn("Aggregation did not complete for application " + appId); LOG.warn("Aggregation did not complete for application " + appId);
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
} }
this.appAggregationFinished.set(true); this.appAggregationFinished.set(true);
} }

View File

@ -416,7 +416,6 @@ public class LogAggregationService extends AbstractService implements
// A container is complete. Put this containers' logs up for aggregation if // A container is complete. Put this containers' logs up for aggregation if
// this containers' logs are needed. // this containers' logs are needed.
AppLogAggregator aggregator = this.appLogAggregators.get( AppLogAggregator aggregator = this.appLogAggregators.get(
containerId.getApplicationAttemptId().getApplicationId()); containerId.getApplicationAttemptId().getApplicationId());
if (aggregator == null) { if (aggregator == null) {
@ -436,6 +435,7 @@ public class LogAggregationService extends AbstractService implements
new ContainerLogContext(containerId, containerType, exitCode)); new ContainerLogContext(containerId, containerType, exitCode));
} }
@SuppressWarnings("unchecked")
private void stopApp(ApplicationId appId) { private void stopApp(ApplicationId appId) {
// App is complete. Finish up any containers' pending log aggregation and // App is complete. Finish up any containers' pending log aggregation and
@ -445,6 +445,9 @@ public class LogAggregationService extends AbstractService implements
if (aggregator == null) { if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + appId LOG.warn("Log aggregation is not initialized for " + appId
+ ", did it fail to start?"); + ", did it fail to start?");
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
return; return;
} }
aggregator.finishLogAggregation(); aggregator.finishLogAggregation();

View File

@ -171,6 +171,10 @@ public class NonAggregatingLogHandler extends AbstractService implements
String user = appOwners.remove(appId); String user = appOwners.remove(appId);
if (user == null) { if (user == null) {
LOG.error("Unable to locate user for " + appId); LOG.error("Unable to locate user for " + appId);
// send LOG_HANDLING_FAILED out
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
break; break;
} }
LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId); LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);

View File

@ -294,6 +294,90 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.stop(); cm.stop();
} }
@Test
public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
Context context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context);
cm.init(conf);
cm.start();
// add an application by starting a container
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = Collections.emptyMap();
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, containerCmds, serviceData,
null, null);
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, null);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
assertNotNull(app);
waitForAppState(app, ApplicationState.INITING);
// simulate application completion
List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
finishedApps.add(appId);
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
app.handle(new ApplicationEvent(app.getAppId(),
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
assertEquals(app.getApplicationState(), ApplicationState.FINISHED);
// application is still in NM context.
assertEquals(1, context.getApplications().size());
// restart and verify app is still there and marked as finished.
cm.stop();
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
// no longer saving FINISH_APP event in NM stateStore,
// simulate by resending FINISH_APP event
cm.handle(new CMgrCompletedAppsEvent(finishedApps,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
// TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
// is needed.
app.handle(new ApplicationEvent(app.getAppId(),
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
assertEquals(app.getApplicationState(), ApplicationState.FINISHED);
// simulate log aggregation failed.
app.handle(new ApplicationEvent(app.getAppId(),
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
// restart and verify app is no longer present after recovery
cm.stop();
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
assertTrue(context.getApplications().isEmpty());
cm.stop();
}
@Test @Test
public void testContainerResizeRecovery() throws Exception { public void testContainerResizeRecovery() throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -84,7 +87,7 @@ public class TestNonAggregatingLogHandler {
DeletionService mockDelService; DeletionService mockDelService;
Configuration conf; Configuration conf;
DrainDispatcher dispatcher; DrainDispatcher dispatcher;
EventHandler<ApplicationEvent> appEventHandler; private ApplicationEventHandler appEventHandler;
String user = "testuser"; String user = "testuser";
ApplicationId appId; ApplicationId appId;
ApplicationAttemptId appAttemptId; ApplicationAttemptId appAttemptId;
@ -97,7 +100,7 @@ public class TestNonAggregatingLogHandler {
mockDelService = mock(DeletionService.class); mockDelService = mock(DeletionService.class);
conf = new YarnConfiguration(); conf = new YarnConfiguration();
dispatcher = createDispatcher(conf); dispatcher = createDispatcher(conf);
appEventHandler = mock(EventHandler.class); appEventHandler = new ApplicationEventHandler();
dispatcher.register(ApplicationEventType.class, appEventHandler); dispatcher.register(ApplicationEventType.class, appEventHandler);
appId = BuilderUtils.newApplicationId(1234, 1); appId = BuilderUtils.newApplicationId(1234, 1);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
@ -345,6 +348,9 @@ public class TestNonAggregatingLogHandler {
dirsHandler.init(conf); dirsHandler.init(conf);
appEventHandler.resetLogHandlingEvent();
assertFalse(appEventHandler.receiveLogHandlingFinishEvent());
NMStateStoreService stateStore = new NMMemoryStateStoreService(); NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf); stateStore.init(conf);
stateStore.start(); stateStore.start();
@ -377,8 +383,21 @@ public class TestNonAggregatingLogHandler {
logHandler.start(); logHandler.start();
verify(logHandler.mockSched, never()).schedule(any(Runnable.class), verify(logHandler.mockSched, never()).schedule(any(Runnable.class),
anyLong(), any(TimeUnit.class)); anyLong(), any(TimeUnit.class));
// wait events get drained.
this.dispatcher.await();
assertTrue(appEventHandler.receiveLogHandlingFinishEvent());
appEventHandler.resetLogHandlingEvent();
assertFalse(appEventHandler.receiveLogHandlingFailedEvent());
// send an app finish event against a removed app
logHandler.handle(new LogHandlerAppFinishedEvent(appId));
this.dispatcher.await();
// verify to receive a log failed event.
assertTrue(appEventHandler.receiveLogHandlingFailedEvent());
assertFalse(appEventHandler.receiveLogHandlingFinishEvent());
logHandler.close(); logHandler.close();
} }
/** /**
* Function to run a log handler with directories failing the getFileStatus * Function to run a log handler with directories failing the getFileStatus
@ -536,4 +555,37 @@ public class TestNonAggregatingLogHandler {
} }
return dirs; return dirs;
} }
class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
private boolean logHandlingFinished = false;
private boolean logHandlingFailed = false;
@Override
public void handle(ApplicationEvent event) {
switch (event.getType()) {
case APPLICATION_LOG_HANDLING_FINISHED:
logHandlingFinished = true;
break;
case APPLICATION_LOG_HANDLING_FAILED:
logHandlingFailed = true;
default:
// do nothing.
}
}
public boolean receiveLogHandlingFinishEvent() {
return logHandlingFinished;
}
public boolean receiveLogHandlingFailedEvent() {
return logHandlingFailed;
}
public void resetLogHandlingEvent() {
logHandlingFinished = false;
logHandlingFailed = false;
}
}
} }