YARN-4720. Skip unnecessary NN operations in log aggregation. (Jun Gong via mingma)

(cherry picked from commit 7f3139e54d)
This commit is contained in:
Ming Ma 2016-02-26 08:40:05 -08:00
parent cf8bcc73c2
commit 6f5ca1b293
3 changed files with 106 additions and 24 deletions

View File

@ -645,6 +645,9 @@ Release 2.8.0 - UNRELEASED
YARN-4207. Add a non-judgemental YARN app completion status. (Rich Haase via sseth)
YARN-4720. Skip unnecessary NN operations in log aggregation.
(Jun Gong via mingma)
BUG FIXES
YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9)

View File

@ -127,6 +127,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// This variable is only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
// This variable is only for testing
private int logAggregationTimes = 0;
private boolean renameTemporaryLogFileFailed = false;
private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
@ -311,7 +314,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
LogWriter writer = null;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
try {
if (pendingContainerInThisCycle.isEmpty()) {
return;
}
logAggregationTimes++;
try {
writer =
new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
@ -321,6 +332,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
writer.writeApplicationOwner(this.userUgi.getShortUserName());
} catch (IOException e1) {
logAggregationSucceedInThisCycle = false;
LOG.error("Cannot create writer for app " + this.applicationId
+ ". Skip log upload this time. ", e1);
return;
@ -369,21 +381,17 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
remoteNodeLogFileForApp.getName() + "_"
+ currentTime);
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
final boolean rename = uploadedLogsInThisCycle;
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
if (rename) {
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
} else {
remoteFS.delete(remoteNodeTmpLogFileForApp, false);
}
}
return null;
}
});
@ -405,33 +413,39 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
}
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
report.setDiagnosticMessage(diagnosticMessage);
report.setLogAggregationStatus(logAggregationSucceedInThisCycle
} finally {
LogAggregationStatus logAggregationStatus =
logAggregationSucceedInThisCycle
? LogAggregationStatus.RUNNING
: LogAggregationStatus.RUNNING_WITH_FAILURE);
this.context.getLogAggregationStatusForApps().add(report);
: LogAggregationStatus.RUNNING_WITH_FAILURE;
sendLogAggregationReport(logAggregationStatus, diagnosticMessage);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
// that the log aggregation in this NM is completed.
LogAggregationReport finalReport =
Records.newRecord(LogAggregationReport.class);
finalReport.setApplicationId(appId);
finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed
? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
this.context.getLogAggregationStatusForApps().add(finalReport);
LogAggregationStatus finalLogAggregationStatus =
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
sendLogAggregationReport(finalLogAggregationStatus, "");
}
} finally {
if (writer != null) {
writer.close();
}
}
}
private void sendLogAggregationReport(
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
report.setDiagnosticMessage(diagnosticMessage);
report.setLogAggregationStatus(logAggregationStatus);
this.context.getLogAggregationStatusForApps().add(report);
}
private void cleanOldLogs() {
try {
final FileSystem remoteFS =
@ -669,4 +683,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
public UserGroupInformation getUgi() {
return this.userUgi;
}
@Private
@VisibleForTesting
public int getLogAggregationTimes() {
return this.logAggregationTimes;
}
}

View File

@ -2127,6 +2127,65 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
}
@Test (timeout = 20000)
public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception {
LogAggregationContext logAggregationContext =
Records.newRecord(LogAggregationContext.class);
logAggregationContext.setLogAggregationPolicyClassName(
FailedOrKilledContainerLogAggregationPolicy.class.getName());
verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2);
}
@Test (timeout = 20000)
public void testSkipUnnecessaryNNOperationsForService() throws Exception {
this.conf.setLong(
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
3600);
LogAggregationContext contextWithAMOnly =
Records.newRecord(LogAggregationContext.class);
contextWithAMOnly.setLogAggregationPolicyClassName(
AMOnlyLogAggregationPolicy.class.getName());
contextWithAMOnly.setRolledLogsIncludePattern("sys*");
contextWithAMOnly.setRolledLogsExcludePattern("std_final");
verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4);
}
private void verifySkipUnnecessaryNNOperations(
LogAggregationContext logAggregationContext,
int expectedLogAggregationTimes, int expectedAggregationReportNum)
throws Exception {
LogAggregationService logAggregationService = new LogAggregationService(
dispatcher, this.context, this.delSrvc, super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId appId = createApplication();
logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user,
null, this.acls, logAggregationContext));
// Container finishes
String[] logFiles = new String[] { "stdout" };
finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
AppLogAggregatorImpl aggregator =
(AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
.get(appId);
aggregator.doLogAggregationOutOfBand();
Thread.sleep(2000);
aggregator.doLogAggregationOutOfBand();
Thread.sleep(2000);
// App finishes.
logAggregationService.handle(new LogHandlerAppFinishedEvent(appId));
logAggregationService.stop();
assertEquals(expectedLogAggregationTimes,
aggregator.getLogAggregationTimes());
assertEquals(expectedAggregationReportNum,
this.context.getLogAggregationStatusForApps().size());
}
private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException {