diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index b6601b835d6..ff531cdc2a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4960,6 +4960,17 @@ public class YarnConfiguration extends Configuration {
public static final String APPS_CACHE_EXPIRE = YARN_PREFIX + "apps.cache.expire";
public static final String DEFAULT_APPS_CACHE_EXPIRE = "30s";
+ /** Enabled trigger log-dir deletion by size for NonAggregatingLogHandler. */
+ public static final String NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = NM_PREFIX +
+ "log.trigger.delete.by-size.enabled";
+ public static final boolean DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = false;
+
+ /** Trigger log-dir deletion when the total log size of an app is greater than
+ * yarn.nodemanager.log.delete.threshold.
+ * Depends on yarn.nodemanager.log.trigger.delete.by-size.enabled = true. */
+ public static final String NM_LOG_DELETE_THRESHOLD = NM_PREFIX + "log.delete.threshold";
+ public static final long DEFAULT_NM_LOG_DELETE_THRESHOLD = 100L * 1024 * 1024 * 1024;
+
public YarnConfiguration() {
super();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 1f0982aede0..aea92260013 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5293,4 +5293,27 @@
+
+ yarn.nodemanager.log.trigger.delete.by-size.enabled
+ false
+
+ Optional.
+ Enabled trigger log-dir deletion by size for NonAggregatingLogHandler
+
+
+
+
+ yarn.nodemanager.log.delete.threshold
+ 100g
+
+ Optional.
+ Trigger log-dir deletion when the total log size of an app is greater than
+ yarn.nodemanager.log.delete.threshold and
+ yarn.nodemanager.log.trigger.delete.by-size.enabled = true.
+ You can use the following suffix (case insensitive): k(kilo), m(mega), g(giga), t(tera), p(peta),
+ e(exa) to specify the size (such as 128k, 512m, 1g, etc.),
+ Or provide complete size in bytes (such as 134217728 for 128 MB).
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index e8b2f472fb4..1721756f056 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -71,6 +71,8 @@ public class NonAggregatingLogHandler extends AbstractService implements
private final LocalDirsHandlerService dirsHandler;
private final NMStateStoreService stateStore;
private long deleteDelaySeconds;
+ private boolean enableTriggerDeleteBySize;
+ private long deleteThreshold;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
@@ -90,6 +92,12 @@ public class NonAggregatingLogHandler extends AbstractService implements
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
+ this.enableTriggerDeleteBySize =
+ conf.getBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED,
+ YarnConfiguration.DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED);
+ this.deleteThreshold =
+ conf.getLongBytes(YarnConfiguration.NM_LOG_DELETE_THRESHOLD,
+ YarnConfiguration.DEFAULT_NM_LOG_DELETE_THRESHOLD);
sched = createScheduledThreadPoolExecutor(conf);
super.serviceInit(conf);
recover();
@@ -165,13 +173,9 @@ public class NonAggregatingLogHandler extends AbstractService implements
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
ApplicationId appId = appFinishedEvent.getApplicationId();
- // Schedule - so that logs are available on the UI till they're deleted.
- LOG.info("Scheduling Log Deletion for application: "
- + appId + ", with delay of "
- + this.deleteDelaySeconds + " seconds");
String user = appOwners.remove(appId);
if (user == null) {
- LOG.error("Unable to locate user for " + appId);
+ LOG.error("Unable to locate user for {}", appId);
// send LOG_HANDLING_FAILED out
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appId,
@@ -191,8 +195,20 @@ public class NonAggregatingLogHandler extends AbstractService implements
LOG.error("Unable to record log deleter state", e);
}
try {
- sched.schedule(logDeleter, this.deleteDelaySeconds,
- TimeUnit.SECONDS);
+ boolean logDeleterStarted = false;
+ if (enableTriggerDeleteBySize) {
+ final long appLogSize = calculateSizeOfAppLogs(user, appId);
+ if (appLogSize >= deleteThreshold) {
+ LOG.info("Log Deletion for application: {}, with no delay, size={}", appId, appLogSize);
+ sched.schedule(logDeleter, 0, TimeUnit.SECONDS);
+ logDeleterStarted = true;
+ }
+ }
+ if (!logDeleterStarted) {
+ LOG.info("Scheduling Log Deletion for application: {}, with delay of {} seconds",
+ appId, this.deleteDelaySeconds);
+ sched.schedule(logDeleter, this.deleteDelaySeconds, TimeUnit.SECONDS);
+ }
} catch (RejectedExecutionException e) {
// Handling this event in local thread before starting threads
// or after calling sched.shutdownNow().
@@ -200,7 +216,6 @@ public class NonAggregatingLogHandler extends AbstractService implements
}
break;
default:
- ; // Ignore
}
}
@@ -220,6 +235,24 @@ public class NonAggregatingLogHandler extends AbstractService implements
return sched;
}
+ private long calculateSizeOfAppLogs(String user, ApplicationId applicationId) {
+ FileContext lfs = getLocalFileContext(getConfig());
+ long appLogsSize = 0L;
+ for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+ Path logDir = new Path(rootLogDir, applicationId.toString());
+ try {
+ appLogsSize += lfs.getFileStatus(logDir).getLen();
+ } catch (UnsupportedFileSystemException ue) {
+ LOG.warn("Unsupported file system used for log dir {}", logDir, ue);
+ continue;
+ } catch (IOException ie) {
+ LOG.error("Unable to getFileStatus for {}", logDir, ie);
+ continue;
+ }
+ }
+ return appLogsSize;
+ }
+
class LogDeleterRunnable implements Runnable {
private String user;
private ApplicationId applicationId;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index 8d5adf64321..55a201b7fc2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -596,4 +596,103 @@ public class TestNonAggregatingLogHandler {
}
}
+ @Test
+ public void testLogSizeThresholdDeletion() throws IOException {
+ ApplicationId anotherAppId = BuilderUtils.newApplicationId(4567, 1);
+ ContainerId container22 = BuilderUtils.newContainerId(appAttemptId, 2);
+ String user2 = "test_user2";
+ File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
+ String localLogDirsString = localLogDirs[0].getAbsolutePath() + ","
+ + localLogDirs[1].getAbsolutePath();
+
+ conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+ conf.setBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 60 * 1000);
+ conf.set(YarnConfiguration.NM_LOG_DELETE_THRESHOLD, "15g");
+
+ dirsHandler.init(conf);
+
+ NonAggregatingLogHandler rawLogHandler =
+ new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler,
+ new NMNullStateStoreService());
+ NonAggregatingLogHandler logHandler = spy(rawLogHandler);
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doReturn(lfs).when(logHandler)
+ .getLocalFileContext(isA(Configuration.class));
+ FsPermission defaultPermission =
+ FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+ FileStatus fs1 =
+ new FileStatus(10 * 1024 * 1024 * 1024L, true, 1, 0,
+ System.currentTimeMillis(), 0, defaultPermission, "", "",
+ new Path(localLogDirs[0].getAbsolutePath()));
+ FileStatus fs2 =
+ new FileStatus(5 * 1024 * 1024 * 1024L, true, 1, 0,
+ System.currentTimeMillis(), 0, defaultPermission, "", "",
+ new Path(localLogDirs[0].getAbsolutePath()));
+ Path path1 = new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
+ Path path2 = new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
+ Path path3 = new Path(localLogDirs[0].getAbsolutePath(), anotherAppId.toString());
+ Path path4 = new Path(localLogDirs[1].getAbsolutePath(), anotherAppId.toString());
+
+ doReturn(fs1).when(spylfs).getFileStatus(eq(path1));
+ doReturn(fs1).when(spylfs).getFileStatus(eq(path2));
+ doReturn(fs2).when(spylfs).getFileStatus(eq(path3));
+ doReturn(fs2).when(spylfs).getFileStatus(eq(path4));
+
+ logHandler.init(conf);
+ logHandler.start();
+
+ logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
+
+ logHandler.handle(new LogHandlerContainerFinishedEvent(container11,
+ ContainerType.APPLICATION_MASTER, 0));
+
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+ logHandler.handle(new LogHandlerAppStartedEvent(anotherAppId, user2,
+ null, null));
+
+ logHandler.handle(new LogHandlerContainerFinishedEvent(container22,
+ ContainerType.APPLICATION_MASTER, 0));
+
+ logHandler.handle(new LogHandlerAppFinishedEvent(anotherAppId));
+
+ Path[] localAppLogDirs = new Path[]{path1, path2};
+ Path[] anotherLocalAppLogDirs = new Path[]{path3, path4};
+
+ testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
+ testDeletionServiceNeverCall(mockDelService, user2, 5000, anotherLocalAppLogDirs);
+
+ logHandler.close();
+ for (int i = 0; i < localLogDirs.length; i++) {
+ FileUtils.deleteDirectory(localLogDirs[i]);
+ }
+ }
+
+ static void testDeletionServiceNeverCall(DeletionService delService, String user,
+ long timeout, Path... matchPaths) {
+ long verifyStartTime = System.currentTimeMillis();
+ WantedButNotInvoked notInvokedException = null;
+ boolean matched = false;
+ while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
+ try {
+ verify(delService, never()).delete(argThat(new FileDeletionMatcher(
+ delService, user, null, Arrays.asList(matchPaths))));
+ matched = true;
+ } catch (WantedButNotInvoked e) {
+ notInvokedException = e;
+ try {
+ Thread.sleep(50l);
+ } catch (InterruptedException i) {
+ }
+ }
+ }
+ if (!matched) {
+ throw notInvokedException;
+ }
+ return;
+ }
}