diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c1bb6aa68d2..213e5e640a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1514,6 +1514,13 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
= 10 * 60 * 1000;
+ /**
+ * Whether to clean up nodemanager logs when log aggregation is enabled.
+ */
+ public static final String LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP =
+ YARN_PREFIX + "log-aggregation.enable-local-cleanup";
+ public static final boolean DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP = true;
+
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4be357b78a6..2d58a1cac2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1516,6 +1516,15 @@
600000
+
+ Whether to clean up nodemanager logs when log aggregation is enabled. Setting to
+ false disables the cleanup nodemanager logging, and it causes disk full in the long run. Users
+ can set to false for test-only purpose.
+
+ yarn.log-aggregation.enable-local-cleanup
+ true
+
+
Time in seconds to retain user logs. Only applicable if
log aggregation is disabled
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 4cc0dc3c713..0a8ddc17b1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -86,6 +86,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Dispatcher dispatcher;
private final ApplicationId appId;
private final String applicationId;
+ private final boolean enableLocalCleanup;
private boolean logAggregationDisabled = false;
private final Configuration conf;
private final DeletionService delService;
@@ -172,6 +173,13 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
+ this.enableLocalCleanup =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+ if (!this.enableLocalCleanup) {
+ LOG.warn("{} is only for testing and not for any production system ",
+ YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+ }
this.logAggPolicy = getLogAggPolicy(conf);
this.recoveredLogInitedTime = recoveredLogInitedTime;
this.logFileSizeThreshold =
@@ -337,26 +345,26 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
appFinished, finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
- LOG.trace("Uploaded the following files for {}: {}",
- container, uploadedFilePathsInThisCycle.toString());
- List uploadedFilePathsInThisCycleList = new ArrayList<>();
- uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
- if (LOG.isDebugEnabled()) {
- for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
- try {
- long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
- if (fileSize >= logFileSizeThreshold) {
- LOG.debug("Log File " + uploadedFilePath
- + " size is " + fileSize + " bytes");
+ if (enableLocalCleanup) {
+ LOG.trace("Uploaded the following files for {}: {}", container,
+ uploadedFilePathsInThisCycle.toString());
+ List uploadedFilePathsInThisCycleList = new ArrayList<>();
+ uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
+ if (LOG.isDebugEnabled()) {
+ for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
+ try {
+ long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
+ if (fileSize >= logFileSizeThreshold) {
+ LOG.debug("Log File " + uploadedFilePath + " size is " + fileSize + " bytes");
+ }
+ } catch (Exception e1) {
+ LOG.error("Failed to get log file size " + e1);
}
- } catch (Exception e1) {
- LOG.error("Failed to get log file size " + e1);
}
}
+ deletionTask = new FileDeletionTask(delService, this.userUgi.getShortUserName(), null,
+ uploadedFilePathsInThisCycleList);
}
- deletionTask = new FileDeletionTask(delService,
- this.userUgi.getShortUserName(), null,
- uploadedFilePathsInThisCycleList);
}
// This container is finished, and all its logs have been uploaded,
@@ -528,6 +536,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
private void doAppLogAggregationPostCleanUp() {
+ if (!enableLocalCleanup) {
+ return;
+ }
// Remove the local app-log-dirs
List localAppLogDirs = new ArrayList();
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 4cc9ac1f3a7..8185f5019c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -234,31 +234,47 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
// ensure filesystems were closed
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
- List dirList = new ArrayList<>();
- dirList.add(new Path(app1LogDir.toURI()));
- verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
- delSrvc, user, null, dirList)));
-
- String containerIdStr = container11.toString();
- File containerLogDir = new File(app1LogDir, containerIdStr);
- int count = 0;
- int maxAttempts = 50;
- for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
- File f = new File(containerLogDir, fileType);
- count = 0;
- while ((f.exists()) && (count < maxAttempts)) {
- count++;
- Thread.sleep(100);
+ boolean filesShouldBeDeleted =
+ this.conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+ if (filesShouldBeDeleted) {
+ List dirList = new ArrayList<>();
+ dirList.add(new Path(app1LogDir.toURI()));
+ verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
+ delSrvc, user, null, dirList)));
+
+ String containerIdStr = container11.toString();
+ File containerLogDir = new File(app1LogDir, containerIdStr);
+ int count = 0;
+ int maxAttempts = 50;
+ for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
+ File f = new File(containerLogDir, fileType);
+ count = 0;
+ while ((f.exists()) && (count < maxAttempts)) {
+ count++;
+ Thread.sleep(100);
+ }
+ Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
}
- Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
+ Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
+ app1LogDir.exists());
+ } else {
+ List dirList = new ArrayList<>();
+ dirList.add(new Path(app1LogDir.toURI()));
+ verify(delSrvc, never()).delete(argThat(new FileDeletionMatcher(
+ delSrvc, user, null, dirList)));
+
+ String containerIdStr = container11.toString();
+ File containerLogDir = new File(app1LogDir, containerIdStr);
+ Thread.sleep(5000);
+ for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
+ File f = new File(containerLogDir, fileType);
+ Assert.assertTrue("File [" + f + "] was not deleted", f.exists());
+ }
+ Assert.assertTrue("Directory [" + app1LogDir + "] was not deleted",
+ app1LogDir.exists());
}
- count = 0;
- while ((app1LogDir.exists()) && (count < maxAttempts)) {
- count++;
- Thread.sleep(100);
- }
- Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
- app1LogDir.exists());
+ delSrvc.stop();
Path logFilePath = logAggregationService
.getLogAggregationFileController(conf)
@@ -297,6 +313,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
verifyLocalFileDeletion(logAggregationService);
}
+ @Test
+ public void testLocalFileRemainsAfterUploadOnCleanupDisable() throws Exception {
+ this.delSrvc = new DeletionService(createContainerExecutor());
+ delSrvc = spy(delSrvc);
+ this.delSrvc.init(conf);
+ this.conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, false);
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+ LogAggregationService logAggregationService = spy(
+ new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler));
+ verifyLocalFileDeletion(logAggregationService);
+ }
+
@Test
public void testLocalFileDeletionOnDiskFull() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());