YARN-4773. Log aggregation performs extraneous filesystem operations when rolling log aggregation is disabled. Contributed by Jun Gong
(cherry picked from commit 948b758070
)
This commit is contained in:
parent
cb79533777
commit
35f9cfda61
|
@ -124,11 +124,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
private final long rollingMonitorInterval;
|
private final long rollingMonitorInterval;
|
||||||
private final boolean logAggregationInRolling;
|
private final boolean logAggregationInRolling;
|
||||||
private final NodeId nodeId;
|
private final NodeId nodeId;
|
||||||
// This variable is only for testing
|
|
||||||
private final AtomicBoolean waiting = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
// This variable is only for testing
|
// These variables are only for testing
|
||||||
|
private final AtomicBoolean waiting = new AtomicBoolean(false);
|
||||||
private int logAggregationTimes = 0;
|
private int logAggregationTimes = 0;
|
||||||
|
private int cleanupOldLogTimes = 0;
|
||||||
|
|
||||||
private boolean renameTemporaryLogFileFailed = false;
|
private boolean renameTemporaryLogFileFailed = false;
|
||||||
|
|
||||||
|
@ -365,8 +365,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
|
|
||||||
// Before upload logs, make sure the number of existing logs
|
// Before upload logs, make sure the number of existing logs
|
||||||
// is smaller than the configured NM log aggregation retention size.
|
// is smaller than the configured NM log aggregation retention size.
|
||||||
if (uploadedLogsInThisCycle) {
|
if (uploadedLogsInThisCycle && logAggregationInRolling) {
|
||||||
cleanOldLogs();
|
cleanOldLogs();
|
||||||
|
cleanupOldLogTimes++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
|
@ -689,4 +690,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
public int getLogAggregationTimes() {
|
public int getLogAggregationTimes() {
|
||||||
return this.logAggregationTimes;
|
return this.logAggregationTimes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getCleanupOldLogTimes() {
|
||||||
|
return this.cleanupOldLogTimes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2133,7 +2133,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
Records.newRecord(LogAggregationContext.class);
|
Records.newRecord(LogAggregationContext.class);
|
||||||
logAggregationContext.setLogAggregationPolicyClassName(
|
logAggregationContext.setLogAggregationPolicyClassName(
|
||||||
FailedOrKilledContainerLogAggregationPolicy.class.getName());
|
FailedOrKilledContainerLogAggregationPolicy.class.getName());
|
||||||
verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2);
|
verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 20000)
|
@Test (timeout = 20000)
|
||||||
|
@ -2147,13 +2147,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
AMOnlyLogAggregationPolicy.class.getName());
|
AMOnlyLogAggregationPolicy.class.getName());
|
||||||
contextWithAMOnly.setRolledLogsIncludePattern("sys*");
|
contextWithAMOnly.setRolledLogsIncludePattern("sys*");
|
||||||
contextWithAMOnly.setRolledLogsExcludePattern("std_final");
|
contextWithAMOnly.setRolledLogsExcludePattern("std_final");
|
||||||
verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4);
|
verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifySkipUnnecessaryNNOperations(
|
private void verifySkipUnnecessaryNNOperations(
|
||||||
LogAggregationContext logAggregationContext,
|
LogAggregationContext logAggregationContext,
|
||||||
int expectedLogAggregationTimes, int expectedAggregationReportNum)
|
int expectedLogAggregationTimes, int expectedAggregationReportNum,
|
||||||
throws Exception {
|
int expectedCleanupOldLogsTimes) throws Exception {
|
||||||
LogAggregationService logAggregationService = new LogAggregationService(
|
LogAggregationService logAggregationService = new LogAggregationService(
|
||||||
dispatcher, this.context, this.delSrvc, super.dirsHandler);
|
dispatcher, this.context, this.delSrvc, super.dirsHandler);
|
||||||
logAggregationService.init(this.conf);
|
logAggregationService.init(this.conf);
|
||||||
|
@ -2164,7 +2164,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
null, this.acls, logAggregationContext));
|
null, this.acls, logAggregationContext));
|
||||||
|
|
||||||
// Container finishes
|
// Container finishes
|
||||||
String[] logFiles = new String[] { "stdout" };
|
String[] logFiles = new String[] { "sysout" };
|
||||||
finishContainer(appId, logAggregationService,
|
finishContainer(appId, logAggregationService,
|
||||||
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
|
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
|
||||||
AppLogAggregatorImpl aggregator =
|
AppLogAggregatorImpl aggregator =
|
||||||
|
@ -2184,6 +2184,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
aggregator.getLogAggregationTimes());
|
aggregator.getLogAggregationTimes());
|
||||||
assertEquals(expectedAggregationReportNum,
|
assertEquals(expectedAggregationReportNum,
|
||||||
this.context.getLogAggregationStatusForApps().size());
|
this.context.getLogAggregationStatusForApps().size());
|
||||||
|
assertEquals(expectedCleanupOldLogsTimes,
|
||||||
|
aggregator.getCleanupOldLogTimes());
|
||||||
}
|
}
|
||||||
|
|
||||||
private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
private int numOfLogsAvailable(LogAggregationService logAggregationService,
|
||||||
|
|
Loading…
Reference in New Issue