YARN-4984. LogAggregationService shouldn't swallow exception in handling createAppDir() which cause thread leak. (Junping Du via wangda)

This commit is contained in:
Wangda Tan 2016-05-04 11:38:55 -07:00
parent e61d431275
commit 7bd418e48c
2 changed files with 11 additions and 7 deletions

View File

@ -376,6 +376,9 @@ public class LogAggregationService extends AbstractService implements
} else { } else {
appDirException = (YarnRuntimeException)e; appDirException = (YarnRuntimeException)e;
} }
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
throw appDirException;
} }
// TODO Get the user configuration for the list of containers that need log // TODO Get the user configuration for the list of containers that need log
@ -393,10 +396,6 @@ public class LogAggregationService extends AbstractService implements
} }
}; };
this.threadPool.execute(aggregatorWrapper); this.threadPool.execute(aggregatorWrapper);
if (appDirException != null) {
throw appDirException;
}
} }
protected void closeFileSystems(final UserGroupInformation userUgi) { protected void closeFileSystems(final UserGroupInformation userUgi) {

View File

@ -777,8 +777,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
dispatcher.await(); dispatcher.await();
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
new ApplicationEvent(appId, new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
}; };
checkEvents(appEventHandler, expectedEvents, false, checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic"); "getType", "getApplicationID", "getDiagnostic");
@ -794,10 +794,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop(); logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators()); assertEquals(0, logAggregationService.getNumAggregators());
verify(spyDelSrvc).delete(eq(user), any(Path.class), // local log dir shouldn't be deleted given log aggregation cannot
// continue due to aggregated log dir creation failure on remoteFS.
verify(spyDelSrvc, never()).delete(eq(user), any(Path.class),
Mockito.<Path>anyVararg()); Mockito.<Path>anyVararg());
verify(logAggregationService).closeFileSystems( verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class)); any(UserGroupInformation.class));
// make sure local log dir is not deleted in case log aggregation
// service cannot be initiated.
assertTrue(appLogDir.exists());
} }
private void writeContainerLogs(File appLogDir, ContainerId containerId, private void writeContainerLogs(File appLogDir, ContainerId containerId,