From 1656bcec5f5ad949c97e66e51ac38bf26e8093f3 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Fri, 26 Feb 2016 08:40:05 -0800 Subject: [PATCH] YARN-4720. Skip unnecessary NN operations in log aggregation. (Jun Gong via mingma) (cherry picked from commit 7f3139e54da2c496327446a5eac43f8421fc8839) --- hadoop-yarn-project/CHANGES.txt | 3 + .../logaggregation/AppLogAggregatorImpl.java | 68 ++++++++++++------- .../TestLogAggregationService.java | 59 ++++++++++++++++ 3 files changed, 106 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cf4d5fd266c..863fa103fda 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -826,6 +826,9 @@ Release 2.8.0 - UNRELEASED YARN-4066. Large number of queues choke fair scheduler. (Johan Gustavsson via kasha) + YARN-4720. Skip unnecessary NN operations in log aggregation. + (Jun Gong via mingma) + BUG FIXES YARN-4680. TimerTasks leak in ATS V1.5 Writer. (Xuan Gong via gtcarrera9) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 82a1ad41ac8..da7fc14cb6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -127,6 +127,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator { // This variable is only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); + // This variable is only for testing + private int logAggregationTimes = 0; + private boolean renameTemporaryLogFileFailed = false; private final Map containerLogAggregators = @@ -311,7 +314,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } LogWriter writer = null; + String diagnosticMessage = ""; + boolean logAggregationSucceedInThisCycle = true; try { + if (pendingContainerInThisCycle.isEmpty()) { + return; + } + + logAggregationTimes++; + try { writer = new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, @@ -321,6 +332,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { writer.writeApplicationOwner(this.userUgi.getShortUserName()); } catch (IOException e1) { + logAggregationSucceedInThisCycle = false; LOG.error("Cannot create writer for app " + this.applicationId + ". Skip log upload this time. ", e1); return; @@ -369,20 +381,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator { remoteNodeLogFileForApp.getName() + "_" + currentTime); - String diagnosticMessage = ""; - boolean logAggregationSucceedInThisCycle = true; final boolean rename = uploadedLogsInThisCycle; try { userUgi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf); - if (remoteFS.exists(remoteNodeTmpLogFileForApp)) { - if (rename) { - remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); - } else { - remoteFS.delete(remoteNodeTmpLogFileForApp, false); - } + if (rename) { + remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); + } else { + remoteFS.delete(remoteNodeTmpLogFileForApp, false); } return null; } @@ -405,33 +413,39 @@ public class AppLogAggregatorImpl implements AppLogAggregator { renameTemporaryLogFileFailed = true; logAggregationSucceedInThisCycle = false; } - - LogAggregationReport report = - Records.newRecord(LogAggregationReport.class); - report.setApplicationId(appId); - report.setDiagnosticMessage(diagnosticMessage); - report.setLogAggregationStatus(logAggregationSucceedInThisCycle - ? LogAggregationStatus.RUNNING - : LogAggregationStatus.RUNNING_WITH_FAILURE); - this.context.getLogAggregationStatusForApps().add(report); + } finally { + LogAggregationStatus logAggregationStatus = + logAggregationSucceedInThisCycle + ? LogAggregationStatus.RUNNING + : LogAggregationStatus.RUNNING_WITH_FAILURE; + sendLogAggregationReport(logAggregationStatus, diagnosticMessage); if (appFinished) { // If the app is finished, one extra final report with log aggregation // status SUCCEEDED/FAILED will be sent to RM to inform the RM // that the log aggregation in this NM is completed. - LogAggregationReport finalReport = - Records.newRecord(LogAggregationReport.class); - finalReport.setApplicationId(appId); - finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed - ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED); - this.context.getLogAggregationStatusForApps().add(finalReport); + LogAggregationStatus finalLogAggregationStatus = + renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle + ? LogAggregationStatus.FAILED + : LogAggregationStatus.SUCCEEDED; + sendLogAggregationReport(finalLogAggregationStatus, ""); } - } finally { + if (writer != null) { writer.close(); } } } + private void sendLogAggregationReport( + LogAggregationStatus logAggregationStatus, String diagnosticMessage) { + LogAggregationReport report = + Records.newRecord(LogAggregationReport.class); + report.setApplicationId(appId); + report.setDiagnosticMessage(diagnosticMessage); + report.setLogAggregationStatus(logAggregationStatus); + this.context.getLogAggregationStatusForApps().add(report); + } + private void cleanOldLogs() { try { final FileSystem remoteFS = @@ -669,4 +683,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator { public UserGroupInformation getUgi() { return this.userUgi; } + + @Private + @VisibleForTesting + public int getLogAggregationTimes() { + return this.logAggregationTimes; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 87c3f27b445..0392b3899dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -2270,6 +2270,65 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); } + @Test (timeout = 20000) + public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception { + LogAggregationContext logAggregationContext = + Records.newRecord(LogAggregationContext.class); + logAggregationContext.setLogAggregationPolicyClassName( + FailedOrKilledContainerLogAggregationPolicy.class.getName()); + verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2); + } + + @Test (timeout = 20000) + public void testSkipUnnecessaryNNOperationsForService() throws Exception { + this.conf.setLong( + YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + 3600); + LogAggregationContext contextWithAMOnly = + Records.newRecord(LogAggregationContext.class); + contextWithAMOnly.setLogAggregationPolicyClassName( + AMOnlyLogAggregationPolicy.class.getName()); + contextWithAMOnly.setRolledLogsIncludePattern("sys*"); + contextWithAMOnly.setRolledLogsExcludePattern("std_final"); + verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4); + } + + private void verifySkipUnnecessaryNNOperations( + LogAggregationContext logAggregationContext, + int expectedLogAggregationTimes, int expectedAggregationReportNum) + throws Exception { + LogAggregationService logAggregationService = new LogAggregationService( + dispatcher, this.context, this.delSrvc, super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId appId = createApplication(); + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, + null, this.acls, logAggregationContext)); + + // Container finishes + String[] logFiles = new String[] { "stdout" }; + finishContainer(appId, logAggregationService, + ContainerType.APPLICATION_MASTER, 1, 0, logFiles); + AppLogAggregatorImpl aggregator = + (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(appId); + aggregator.doLogAggregationOutOfBand(); + + Thread.sleep(2000); + aggregator.doLogAggregationOutOfBand(); + Thread.sleep(2000); + + // App finishes. + logAggregationService.handle(new LogHandlerAppFinishedEvent(appId)); + logAggregationService.stop(); + + assertEquals(expectedLogAggregationTimes, + aggregator.getLogAggregationTimes()); + assertEquals(expectedAggregationReportNum, + this.context.getLogAggregationStatusForApps().size()); + } + private int numOfLogsAvailable(LogAggregationService logAggregationService, ApplicationId appId, boolean sizeLimited, String lastLogFile) throws IOException {