YARN-4984. LogAggregationService shouldn't swallow exception in handling createAppDir() which cause thread leak. (Junping Du via wangda)
(cherry picked from commit 7bd418e48c
)
This commit is contained in:
parent
1ffb0c43d6
commit
585299146a
|
@ -376,6 +376,9 @@ public class LogAggregationService extends AbstractService implements
|
||||||
} else {
|
} else {
|
||||||
appDirException = (YarnRuntimeException)e;
|
appDirException = (YarnRuntimeException)e;
|
||||||
}
|
}
|
||||||
|
appLogAggregators.remove(appId);
|
||||||
|
closeFileSystems(userUgi);
|
||||||
|
throw appDirException;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Get the user configuration for the list of containers that need log
|
// TODO Get the user configuration for the list of containers that need log
|
||||||
|
@ -393,10 +396,6 @@ public class LogAggregationService extends AbstractService implements
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
this.threadPool.execute(aggregatorWrapper);
|
this.threadPool.execute(aggregatorWrapper);
|
||||||
|
|
||||||
if (appDirException != null) {
|
|
||||||
throw appDirException;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void closeFileSystems(final UserGroupInformation userUgi) {
|
protected void closeFileSystems(final UserGroupInformation userUgi) {
|
||||||
|
|
|
@ -778,7 +778,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
|
ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
|
||||||
new ApplicationEvent(appId,
|
new ApplicationEvent(appId,
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
|
||||||
};
|
};
|
||||||
checkEvents(appEventHandler, expectedEvents, false,
|
checkEvents(appEventHandler, expectedEvents, false,
|
||||||
"getType", "getApplicationID", "getDiagnostic");
|
"getType", "getApplicationID", "getDiagnostic");
|
||||||
|
@ -794,10 +794,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
assertEquals(0, logAggregationService.getNumAggregators());
|
assertEquals(0, logAggregationService.getNumAggregators());
|
||||||
verify(spyDelSrvc).delete(eq(user), any(Path.class),
|
// local log dir shouldn't be deleted given log aggregation cannot
|
||||||
|
// continue due to aggregated log dir creation failure on remoteFS.
|
||||||
|
verify(spyDelSrvc, never()).delete(eq(user), any(Path.class),
|
||||||
Mockito.<Path>anyVararg());
|
Mockito.<Path>anyVararg());
|
||||||
verify(logAggregationService).closeFileSystems(
|
verify(logAggregationService).closeFileSystems(
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
|
// make sure local log dir is not deleted in case log aggregation
|
||||||
|
// service cannot be initiated.
|
||||||
|
assertTrue(appLogDir.exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeContainerLogs(File appLogDir, ContainerId containerId,
|
private void writeContainerLogs(File appLogDir, ContainerId containerId,
|
||||||
|
|
Loading…
Reference in New Issue