From 16d71443373e8eda8644bee54f3f9a6979d66a56 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 10 Oct 2014 00:10:39 -0700 Subject: [PATCH] YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling aggregated logs. Contributed by Xuan Gong. (cherry picked from commit cb81bac0029fce3a9726df3523f0b692cd3375b8) --- hadoop-yarn-project/CHANGES.txt | 3 + .../AggregatedLogDeletionService.java | 93 ++++++++- .../TestAggregatedLogDeletionService.java | 192 ++++++++++++++++-- .../logaggregation/AppLogAggregatorImpl.java | 151 +++++++++++++- .../logaggregation/LogAggregationService.java | 2 +- .../TestLogAggregationService.java | 94 ++++++--- 6 files changed, 467 insertions(+), 68 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 674909c551f..ec31f8fb290 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -270,6 +270,9 @@ Release 2.6.0 - UNRELEASED YARN-2629. Made the distributed shell use the domain-based timeline ACLs. (zjshen) + YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling + aggregated logs. (Xuan Gong via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 590cfe20c49..4c1d152ccd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -24,38 +24,53 @@ import java.util.TimerTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.common.annotations.VisibleForTesting; /** * A service that periodically deletes aggregated logs. */ -@Private +@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"}) public class AggregatedLogDeletionService extends AbstractService { private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class); private Timer timer = null; private long checkIntervalMsecs; + private LogDeletionTask task; static class LogDeletionTask extends TimerTask { private Configuration conf; private long retentionMillis; private String suffix = null; private Path remoteRootLogDir = null; + private ApplicationClientProtocol rmClient = null; - public LogDeletionTask(Configuration conf, long retentionSecs) { + public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); this.remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + this.rmClient = rmClient; } @Override @@ -64,11 +79,10 @@ public class AggregatedLogDeletionService extends AbstractService { LOG.info("aggregated log deletion started."); try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); - for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) { if(userDir.isDirectory()) { Path userDirPath = new Path(userDir.getPath(), suffix); - deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs); + deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient); } } } catch (IOException e) { @@ -79,18 +93,36 @@ public class AggregatedLogDeletionService extends AbstractService { } private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, - FileSystem fs) { + FileSystem fs, ApplicationClientProtocol rmClient) { try { for(FileStatus appDir : fs.listStatus(dir)) { if(appDir.isDirectory() && appDir.getModificationTime() < cutoffMillis) { - if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) { + boolean appTerminated = + isApplicationTerminated(ConverterUtils.toApplicationId(appDir + .getPath().getName()), rmClient); + if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) { try { LOG.info("Deleting aggregated logs in "+appDir.getPath()); fs.delete(appDir.getPath(), true); } catch (IOException e) { logIOException("Could not delete "+appDir.getPath(), e); } + } else if (!appTerminated){ + try { + for(FileStatus node: fs.listStatus(appDir.getPath())) { + if(node.getModificationTime() < cutoffMillis) { + try { + fs.delete(node.getPath(), true); + } catch (IOException ex) { + logIOException("Could not delete "+appDir.getPath(), ex); + } + } + } + } catch(IOException e) { + logIOException( + "Error reading the contents of " + appDir.getPath(), e); + } } } } @@ -115,6 +147,29 @@ public class AggregatedLogDeletionService extends AbstractService { } return shouldDelete; } + + private static boolean isApplicationTerminated(ApplicationId appId, + ApplicationClientProtocol rmClient) throws IOException { + ApplicationReport appReport = null; + try { + appReport = + rmClient.getApplicationReport( + GetApplicationReportRequest.newInstance(appId)) + .getApplicationReport(); + } catch (ApplicationNotFoundException e) { + return true; + } catch (YarnException e) { + throw new IOException(e); + } + YarnApplicationState currentState = appReport.getYarnApplicationState(); + return currentState == YarnApplicationState.FAILED + || currentState == YarnApplicationState.KILLED + || currentState == YarnApplicationState.FINISHED; + } + + public ApplicationClientProtocol getRMClient() { + return this.rmClient; + } } private static void logIOException(String comment, IOException e) { @@ -140,6 +195,7 @@ public class AggregatedLogDeletionService extends AbstractService { @Override protected void serviceStop() throws Exception { + stopRMClient(); stopTimer(); super.serviceStop(); } @@ -156,10 +212,11 @@ public class AggregatedLogDeletionService extends AbstractService { } } - public void refreshLogRetentionSettings() { + public void refreshLogRetentionSettings() throws IOException { if (getServiceState() == STATE.STARTED) { Configuration conf = createConf(); setConfig(conf); + stopRMClient(); stopTimer(); scheduleLogDeletionTask(); } else { @@ -167,7 +224,7 @@ public class AggregatedLogDeletionService extends AbstractService { } } - private void scheduleLogDeletionTask() { + private void scheduleLogDeletionTask() throws IOException { Configuration conf = getConfig(); if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { @@ -183,7 +240,7 @@ public class AggregatedLogDeletionService extends AbstractService { return; } setLogAggCheckIntervalMsecs(retentionSecs); - TimerTask task = new LogDeletionTask(conf, retentionSecs); + task = new LogDeletionTask(conf, retentionSecs, creatRMClient()); timer = new Timer(); timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); } @@ -201,4 +258,20 @@ public class AggregatedLogDeletionService extends AbstractService { protected Configuration createConf() { return new Configuration(); } + + // Directly create and use ApplicationClientProtocol. + // We have already marked ApplicationClientProtocol.getApplicationReport + // as @Idempotent, it will automatically take care of RM restart/failover. + @VisibleForTesting + protected ApplicationClientProtocol creatRMClient() throws IOException { + return ClientRMProxy.createRMProxy(getConfig(), + ApplicationClientProtocol.class); + } + + @VisibleForTesting + protected void stopRMClient() { + if (task != null && task.getRMClient() != null) { + RPC.stopProxy(task.getRMClient()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index 05c7e7103c3..026996e010e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.logaggregation; import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -27,6 +30,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Before; import org.junit.Test; @@ -51,7 +60,7 @@ public class TestAggregatedLogDeletionService { String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; - Configuration conf = new Configuration(); + final Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); @@ -69,22 +78,37 @@ public class TestAggregatedLogDeletionService { when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[]{userDirStatus}); - + + ApplicationId appId1 = + ApplicationId.newInstance(System.currentTimeMillis(), 1); Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, "application_1_1"); + Path app1Dir = new Path(userLogDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); - Path app2Dir = new Path(userLogDir, "application_1_2"); + ApplicationId appId2 = + ApplicationId.newInstance(System.currentTimeMillis(), 2); + Path app2Dir = new Path(userLogDir, appId2.toString()); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); - Path app3Dir = new Path(userLogDir, "application_1_3"); + ApplicationId appId3 = + ApplicationId.newInstance(System.currentTimeMillis(), 3); + Path app3Dir = new Path(userLogDir, appId3.toString()); FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); - Path app4Dir = new Path(userLogDir, "application_1_4"); + ApplicationId appId4 = + ApplicationId.newInstance(System.currentTimeMillis(), 4); + Path app4Dir = new Path(userLogDir, appId4.toString()); FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + ApplicationId appId5 = + ApplicationId.newInstance(System.currentTimeMillis(), 5); + Path app5Dir = new Path(userLogDir, appId5.toString()); + FileStatus app5DirStatus = + new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir); + when(mockFs.listStatus(userLogDir)).thenReturn( - new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, app4DirStatus}); + new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus, + app4DirStatus, app5DirStatus }); when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{}); @@ -117,20 +141,55 @@ public class TestAggregatedLogDeletionService { when(mockFs.listStatus(app4Dir)).thenReturn( new FileStatus[]{app4Log1Status, app4Log2Status}); + + Path app5Log1 = new Path(app5Dir, "host1"); + FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1); - AggregatedLogDeletionService.LogDeletionTask task = - new AggregatedLogDeletionService.LogDeletionTask(conf, 1800); - - task.run(); - - verify(mockFs).delete(app1Dir, true); - verify(mockFs, times(0)).delete(app2Dir, true); - verify(mockFs).delete(app3Dir, true); - verify(mockFs).delete(app4Dir, true); + Path app5Log2 = new Path(app5Dir, "host2"); + FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2); + + when(mockFs.listStatus(app5Dir)).thenReturn( + new FileStatus[]{app5Log1Status, app5Log2Status}); + + final List finishedApplications = + Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3, + appId4)); + final List runningApplications = + Collections.unmodifiableList(Arrays.asList(appId5)); + + AggregatedLogDeletionService deletionService = + new AggregatedLogDeletionService() { + @Override + protected ApplicationClientProtocol creatRMClient() + throws IOException { + try { + return createMockRMClient(finishedApplications, + runningApplications); + } catch (Exception e) { + throw new IOException(e); + } + } + @Override + protected void stopRMClient() { + // DO NOTHING + } + }; + deletionService.init(conf); + deletionService.start(); + + verify(mockFs, timeout(2000)).delete(app1Dir, true); + verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true); + verify(mockFs, timeout(2000)).delete(app3Dir, true); + verify(mockFs, timeout(2000)).delete(app4Dir, true); + verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true); + verify(mockFs, timeout(2000)).delete(app5Log1, true); + verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true); + + deletionService.stop(); } @Test - public void testRefreshLogRetentionSettings() throws IOException { + public void testRefreshLogRetentionSettings() throws Exception { long now = System.currentTimeMillis(); //time before 2000 sec long before2000Secs = now - (2000 * 1000); @@ -163,13 +222,17 @@ public class TestAggregatedLogDeletionService { Path userLogDir = new Path(userDir, suffix); + ApplicationId appId1 = + ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs - Path app1Dir = new Path(userLogDir, "application_1_1"); + Path app1Dir = new Path(userLogDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, app1Dir); + ApplicationId appId2 = + ApplicationId.newInstance(System.currentTimeMillis(), 2); //Set time last modified of app1Dir directory and its files to before50Secs - Path app2Dir = new Path(userLogDir, "application_1_2"); + Path app2Dir = new Path(userLogDir, appId2.toString()); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, app2Dir); @@ -190,11 +253,27 @@ public class TestAggregatedLogDeletionService { when(mockFs.listStatus(app2Dir)).thenReturn( new FileStatus[] { app2Log1Status }); + final List finishedApplications = + Collections.unmodifiableList(Arrays.asList(appId1, appId2)); + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { @Override protected Configuration createConf() { return conf; } + @Override + protected ApplicationClientProtocol creatRMClient() + throws IOException { + try { + return createMockRMClient(finishedApplications, null); + } catch (Exception e) { + throw new IOException(e); + } + } + @Override + protected void stopRMClient() { + // DO NOTHING + } }; deletionSvc.init(conf); @@ -253,8 +332,10 @@ public class TestAggregatedLogDeletionService { when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[]{userDirStatus}); + ApplicationId appId1 = + ApplicationId.newInstance(System.currentTimeMillis(), 1); Path userLogDir = new Path(userDir, suffix); - Path app1Dir = new Path(userLogDir, "application_1_1"); + Path app1Dir = new Path(userLogDir, appId1.toString()); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); when(mockFs.listStatus(userLogDir)).thenReturn( @@ -266,8 +347,25 @@ public class TestAggregatedLogDeletionService { when(mockFs.listStatus(app1Dir)).thenReturn( new FileStatus[]{app1Log1Status}); + final List finishedApplications = + Collections.unmodifiableList(Arrays.asList(appId1)); + AggregatedLogDeletionService deletionSvc = - new AggregatedLogDeletionService(); + new AggregatedLogDeletionService() { + @Override + protected ApplicationClientProtocol creatRMClient() + throws IOException { + try { + return createMockRMClient(finishedApplications, null); + } catch (Exception e) { + throw new IOException(e); + } + } + @Override + protected void stopRMClient() { + // DO NOTHING + } + }; deletionSvc.init(conf); deletionSvc.start(); @@ -286,11 +384,61 @@ public class TestAggregatedLogDeletionService { deletionSvc.stop(); } - + static class MockFileSystem extends FilterFileSystem { MockFileSystem() { super(mock(FileSystem.class)); } public void initialize(URI name, Configuration conf) throws IOException {} } + + private static ApplicationClientProtocol createMockRMClient( + List finishedApplicaitons, + List runningApplications) throws Exception { + final ApplicationClientProtocol mockProtocol = + mock(ApplicationClientProtocol.class); + if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) { + for (ApplicationId appId : finishedApplicaitons) { + GetApplicationReportRequest request = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + createApplicationReportWithFinishedApplication(); + when(mockProtocol.getApplicationReport(request)) + .thenReturn(response); + } + } + if (runningApplications != null && !runningApplications.isEmpty()) { + for (ApplicationId appId : runningApplications) { + GetApplicationReportRequest request = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + createApplicationReportWithRunningApplication(); + when(mockProtocol.getApplicationReport(request)) + .thenReturn(response); + } + } + return mockProtocol; + } + + private static GetApplicationReportResponse + createApplicationReportWithRunningApplication() { + ApplicationReport report = mock(ApplicationReport.class); + when(report.getYarnApplicationState()).thenReturn( + YarnApplicationState.RUNNING); + GetApplicationReportResponse response = + mock(GetApplicationReportResponse.class); + when(response.getApplicationReport()).thenReturn(report); + return response; + } + + private static GetApplicationReportResponse + createApplicationReportWithFinishedApplication() { + ApplicationReport report = mock(ApplicationReport.class); + when(report.getYarnApplicationState()).thenReturn( + YarnApplicationState.FINISHED); + GetApplicationReportResponse response = + mock(GetApplicationReportResponse.class); + when(response.getApplicationReport()).thenReturn(report); + return response; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 318caf2c1ad..63f7c66ede2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -20,6 +20,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; @@ -41,6 +46,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; @@ -65,6 +72,23 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private static final Log LOG = LogFactory .getLog(AppLogAggregatorImpl.class); private static final int THREAD_SLEEP_TIME = 1000; + // This is temporary solution. The configuration will be deleted once + // we find a more scalable method to only write a single log file per LRS. + private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP + = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app"; + private static final int + DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30; + + // This configuration is for debug and test purpose. By setting + // this configuration as true. We can break the lower bound of + // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. + private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED + = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; + private static final boolean + DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false; + + private static final long + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600; private final LocalDirsHandlerService dirsHandler; private final Dispatcher dispatcher; @@ -85,13 +109,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final Map appAcls; private final LogAggregationContext logAggregationContext; private final Context context; + private final int retentionSize; + private final long rollingMonitorInterval; + private final NodeId nodeId; private final Map containerLogAggregators = new HashMap(); public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, - ApplicationId appId, UserGroupInformation userUgi, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls, @@ -111,6 +138,51 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.appAcls = appAcls; this.logAggregationContext = logAggregationContext; this.context = context; + this.nodeId = nodeId; + int configuredRentionSize = + conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, + DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP); + if (configuredRentionSize <= 0) { + this.retentionSize = + DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP; + } else { + this.retentionSize = configuredRentionSize; + } + long configuredRollingMonitorInterval = + this.logAggregationContext == null ? -1 : this.logAggregationContext + .getRollingIntervalSeconds(); + boolean debug_mode = + conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, + DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED); + if (configuredRollingMonitorInterval > 0 + && configuredRollingMonitorInterval < + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) { + if (debug_mode) { + this.rollingMonitorInterval = configuredRollingMonitorInterval; + } else { + LOG.warn( + "rollingMonitorIntervall should be more than or equal to " + + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS + + " seconds. Using " + + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS + + " seconds instead."); + this.rollingMonitorInterval = + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS; + } + } else { + if (configuredRollingMonitorInterval <= 0) { + LOG.warn("rollingMonitorInterval is set as " + + configuredRollingMonitorInterval + ". " + + "The log rolling mornitoring interval is disabled. " + + "The logs will be aggregated after this application is finished."); + } else { + LOG.warn("rollingMonitorInterval is set as " + + configuredRollingMonitorInterval + ". " + + "The logs will be aggregated every " + + configuredRollingMonitorInterval + " seconds"); + } + this.rollingMonitorInterval = configuredRollingMonitorInterval; + } } private void uploadLogsForContainers() { @@ -181,12 +253,17 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } + // Before upload logs, make sure the number of existing logs + // is smaller than the configured NM log aggregation retention size. + if (uploadedLogsInThisCycle) { + cleanOldLogs(); + } + if (writer != null) { writer.close(); } - final Path renamedPath = logAggregationContext == null || - logAggregationContext.getRollingIntervalSeconds() <= 0 + final Path renamedPath = this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp : new Path( remoteNodeLogFileForApp.getParent(), remoteNodeLogFileForApp.getName() + "_" @@ -198,9 +275,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator { @Override public Object run() throws Exception { FileSystem remoteFS = FileSystem.get(conf); - if (remoteFS.exists(remoteNodeTmpLogFileForApp) - && rename) { - remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); + if (remoteFS.exists(remoteNodeTmpLogFileForApp)) { + if (rename) { + remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath); + } else { + remoteFS.delete(remoteNodeTmpLogFileForApp, false); + } } return null; } @@ -218,6 +298,60 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } + private void cleanOldLogs() { + try { + final FileSystem remoteFS = + this.remoteNodeLogFileForApp.getFileSystem(conf); + Path appDir = + this.remoteNodeLogFileForApp.getParent().makeQualified( + remoteFS.getUri(), remoteFS.getWorkingDirectory()); + Set status = + new HashSet(Arrays.asList(remoteFS.listStatus(appDir))); + + Iterable mask = + Iterables.filter(status, new Predicate() { + @Override + public boolean apply(FileStatus next) { + return next.getPath().getName() + .contains(LogAggregationUtils.getNodeString(nodeId)) + && !next.getPath().getName().endsWith( + LogAggregationUtils.TMP_FILE_SUFFIX); + } + }); + status = Sets.newHashSet(mask); + // Normally, we just need to delete one oldest log + // before we upload a new log. + // If we can not delete the older logs in this cycle, + // we will delete them in next cycle. + if (status.size() >= this.retentionSize) { + // sort by the lastModificationTime ascending + List statusList = new ArrayList(status); + Collections.sort(statusList, new Comparator() { + public int compare(FileStatus s1, FileStatus s2) { + return s1.getModificationTime() < s2.getModificationTime() ? -1 + : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0; + } + }); + for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) { + final FileStatus remove = statusList.get(i); + try { + userUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + remoteFS.delete(remove.getPath(), false); + return null; + } + }); + } catch (Exception e) { + LOG.error("Failed to delete " + remove.getPath(), e); + } + } + } + } catch (Exception e) { + LOG.error("Failed to clean old logs", e); + } + } + @Override public void run() { try { @@ -235,9 +369,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { while (!this.appFinishing.get() && !this.aborted.get()) { synchronized(this) { try { - if (this.logAggregationContext != null && this.logAggregationContext - .getRollingIntervalSeconds() > 0) { - wait(this.logAggregationContext.getRollingIntervalSeconds() * 1000); + if (this.rollingMonitorInterval > 0) { + wait(this.rollingMonitorInterval * 1000); if (this.appFinishing.get() || this.aborted.get()) { break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 772f3f12225..1d6a9e168ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -342,7 +342,7 @@ public class LogAggregationService extends AbstractService implements // New application final AppLogAggregator appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, - getConfig(), appId, userUgi, dirsHandler, + getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls, logAggregationContext, this.context); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 36c54dcbe53..2c0f349e7f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -699,7 +699,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } } - private void verifyContainerLogs(LogAggregationService logAggregationService, + private String verifyContainerLogs(LogAggregationService logAggregationService, ApplicationId appId, ContainerId[] expectedContainerIds, String[] logFiles, int numOfContainerLogs, boolean multiLogs) throws IOException { @@ -811,6 +811,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Assert.assertEquals(0, thisContainerMap.size()); } Assert.assertEquals(0, logMap.size()); + return targetNodeFile.getPath().getName(); } finally { reader.close(); } @@ -1219,17 +1220,32 @@ public class TestLogAggregationService extends BaseContainerManagerTest { dispatcher.stop(); } - @SuppressWarnings("unchecked") @Test (timeout = 50000) public void testLogAggregationServiceWithInterval() throws Exception { - final int maxAttempts = 50; + testLogAggregationService(false); + } + + @Test (timeout = 50000) + public void testLogAggregationServiceWithRetention() throws Exception { + testLogAggregationService(true); + } + + @SuppressWarnings("unchecked") + private void testLogAggregationService(boolean retentionSizeLimitation) + throws Exception { LogAggregationContext logAggregationContextWithInterval = Records.newRecord(LogAggregationContext.class); logAggregationContextWithInterval.setRollingIntervalSeconds(5000); - this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); + if (retentionSizeLimitation) { + // set the retention size as 1. The number of logs for one application + // in one NM should be 1. + this.conf.setInt(YarnConfiguration.NM_PREFIX + + "log-aggregation.num-log-files-per-app", 1); + } + // by setting this configuration, the log files will not be deleted immediately after // they are aggregated to remote directory. // We could use it to test whether the previous aggregated log files will be aggregated @@ -1280,23 +1296,29 @@ public class TestLogAggregationService extends BaseContainerManagerTest { .get(application); aggregator.doLogAggregationOutOfBand(); - int count = 0; - while (numOfLogsAvailable(logAggregationService, application) != 1 - && count <= maxAttempts) { - Thread.sleep(100); - count++; + if (retentionSizeLimitation) { + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 1, true, null)); + } else { + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 1, false, null)); } + String logFileInLastCycle = null; // Container logs should be uploaded - verifyContainerLogs(logAggregationService, application, + logFileInLastCycle = verifyContainerLogs(logAggregationService, application, new ContainerId[] { container }, logFiles1, 3, true); + Thread.sleep(2000); + // There is no log generated at this time. Do the log aggregation again. aggregator.doLogAggregationOutOfBand(); // Same logs will not be aggregated again. // Only one aggregated log file in Remote file directory. - Assert.assertEquals(numOfLogsAvailable(logAggregationService, application), - 1); + Assert.assertEquals(numOfLogsAvailable(logAggregationService, + application, true, null), 1); + + Thread.sleep(2000); // Do log aggregation String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" }; @@ -1304,16 +1326,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest { aggregator.doLogAggregationOutOfBand(); - count = 0; - while (numOfLogsAvailable(logAggregationService, application) != 2 - && count <= maxAttempts) { - Thread.sleep(100); - count ++; + if (retentionSizeLimitation) { + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 1, true, logFileInLastCycle)); + } else { + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 2, false, null)); } // Container logs should be uploaded - verifyContainerLogs(logAggregationService, application, + logFileInLastCycle = verifyContainerLogs(logAggregationService, application, new ContainerId[] { container }, logFiles2, 3, true); + Thread.sleep(2000); + // create another logs String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" }; writeContainerLogs(appLogDir, container, logFiles3); @@ -1323,13 +1348,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { dispatcher.await(); logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); - count = 0; - while (numOfLogsAvailable(logAggregationService, application) != 3 - && count <= maxAttempts) { - Thread.sleep(100); - count ++; + if (retentionSizeLimitation) { + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 1, true, logFileInLastCycle)); + } else { + Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, + 50, 3, false, null)); } - verifyContainerLogs(logAggregationService, application, new ContainerId[] { container }, logFiles3, 3, true); logAggregationService.stop(); @@ -1338,7 +1363,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } private int numOfLogsAvailable(LogAggregationService logAggregationService, - ApplicationId appId) throws IOException { + ApplicationId appId, boolean sizeLimited, String lastLogFile) + throws IOException { Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user); RemoteIterator nodeFiles = null; try { @@ -1354,7 +1380,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { while (nodeFiles.hasNext()) { FileStatus status = nodeFiles.next(); String filename = status.getPath().getName(); - if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) { + if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX) + || (lastLogFile != null && filename.contains(lastLogFile) + && sizeLimited)) { return -1; } if (filename.contains(LogAggregationUtils @@ -1364,4 +1392,18 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } return count; } + + private boolean waitAndCheckLogNum( + LogAggregationService logAggregationService, ApplicationId application, + int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile) + throws IOException, InterruptedException { + int count = 0; + while (numOfLogsAvailable(logAggregationService, application, sizeLimited, + lastLogFile) != expectNum && count <= maxAttempts) { + Thread.sleep(500); + count++; + } + return numOfLogsAvailable(logAggregationService, application, sizeLimited, + lastLogFile) == expectNum; + } }