From ee94f6cdcbc4914cbdf1a2132f318083c4d38494 Mon Sep 17 00:00:00 2001 From: Xianming Lei <31424839+leixm@users.noreply.github.com> Date: Mon, 5 Jun 2023 11:08:06 +0800 Subject: [PATCH] YARN-11277. Trigger log-dir deletion by size for NonAggregatingLogHandler. (#4797) Reviewed-by: Akira Ajisaka Reviewed-by: Ashutosh Gupta Reviewed-by: Shilun Fan Signed-off-by: Shilun Fan --- .../hadoop/yarn/conf/YarnConfiguration.java | 11 +++ .../src/main/resources/yarn-default.xml | 23 +++++ .../loghandler/NonAggregatingLogHandler.java | 49 +++++++-- .../TestNonAggregatingLogHandler.java | 99 +++++++++++++++++++ 4 files changed, 174 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b6601b835d6..ff531cdc2a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4960,6 +4960,17 @@ public class YarnConfiguration extends Configuration { public static final String APPS_CACHE_EXPIRE = YARN_PREFIX + "apps.cache.expire"; public static final String DEFAULT_APPS_CACHE_EXPIRE = "30s"; + /** Enabled trigger log-dir deletion by size for NonAggregatingLogHandler. */ + public static final String NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = NM_PREFIX + + "log.trigger.delete.by-size.enabled"; + public static final boolean DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED = false; + + /** Trigger log-dir deletion when the total log size of an app is greater than + * yarn.nodemanager.log.delete.threshold. + * Depends on yarn.nodemanager.log.trigger.delete.by-size.enabled = true. */ + public static final String NM_LOG_DELETE_THRESHOLD = NM_PREFIX + "log.delete.threshold"; + public static final long DEFAULT_NM_LOG_DELETE_THRESHOLD = 100L * 1024 * 1024 * 1024; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1f0982aede0..aea92260013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5293,4 +5293,27 @@ + + yarn.nodemanager.log.trigger.delete.by-size.enabled + false + + Optional. + Enabled trigger log-dir deletion by size for NonAggregatingLogHandler + + + + + yarn.nodemanager.log.delete.threshold + 100g + + Optional. + Trigger log-dir deletion when the total log size of an app is greater than + yarn.nodemanager.log.delete.threshold and + yarn.nodemanager.log.trigger.delete.by-size.enabled = true. + You can use the following suffix (case insensitive): k(kilo), m(mega), g(giga), t(tera), p(peta), + e(exa) to specify the size (such as 128k, 512m, 1g, etc.), + Or provide complete size in bytes (such as 134217728 for 128 MB). + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index e8b2f472fb4..1721756f056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -71,6 +71,8 @@ public class NonAggregatingLogHandler extends AbstractService implements private final LocalDirsHandlerService dirsHandler; private final NMStateStoreService stateStore; private long deleteDelaySeconds; + private boolean enableTriggerDeleteBySize; + private long deleteThreshold; private ScheduledThreadPoolExecutor sched; public NonAggregatingLogHandler(Dispatcher dispatcher, @@ -90,6 +92,12 @@ public class NonAggregatingLogHandler extends AbstractService implements this.deleteDelaySeconds = conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); + this.enableTriggerDeleteBySize = + conf.getBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED, + YarnConfiguration.DEFAULT_NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED); + this.deleteThreshold = + conf.getLongBytes(YarnConfiguration.NM_LOG_DELETE_THRESHOLD, + YarnConfiguration.DEFAULT_NM_LOG_DELETE_THRESHOLD); sched = createScheduledThreadPoolExecutor(conf); super.serviceInit(conf); recover(); @@ -165,13 +173,9 @@ public class NonAggregatingLogHandler extends AbstractService implements LogHandlerAppFinishedEvent appFinishedEvent = (LogHandlerAppFinishedEvent) event; ApplicationId appId = appFinishedEvent.getApplicationId(); - // Schedule - so that logs are available on the UI till they're deleted. - LOG.info("Scheduling Log Deletion for application: " - + appId + ", with delay of " - + this.deleteDelaySeconds + " seconds"); String user = appOwners.remove(appId); if (user == null) { - LOG.error("Unable to locate user for " + appId); + LOG.error("Unable to locate user for {}", appId); // send LOG_HANDLING_FAILED out NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle( new ApplicationEvent(appId, @@ -191,8 +195,20 @@ public class NonAggregatingLogHandler extends AbstractService implements LOG.error("Unable to record log deleter state", e); } try { - sched.schedule(logDeleter, this.deleteDelaySeconds, - TimeUnit.SECONDS); + boolean logDeleterStarted = false; + if (enableTriggerDeleteBySize) { + final long appLogSize = calculateSizeOfAppLogs(user, appId); + if (appLogSize >= deleteThreshold) { + LOG.info("Log Deletion for application: {}, with no delay, size={}", appId, appLogSize); + sched.schedule(logDeleter, 0, TimeUnit.SECONDS); + logDeleterStarted = true; + } + } + if (!logDeleterStarted) { + LOG.info("Scheduling Log Deletion for application: {}, with delay of {} seconds", + appId, this.deleteDelaySeconds); + sched.schedule(logDeleter, this.deleteDelaySeconds, TimeUnit.SECONDS); + } } catch (RejectedExecutionException e) { // Handling this event in local thread before starting threads // or after calling sched.shutdownNow(). @@ -200,7 +216,6 @@ public class NonAggregatingLogHandler extends AbstractService implements } break; default: - ; // Ignore } } @@ -220,6 +235,24 @@ public class NonAggregatingLogHandler extends AbstractService implements return sched; } + private long calculateSizeOfAppLogs(String user, ApplicationId applicationId) { + FileContext lfs = getLocalFileContext(getConfig()); + long appLogsSize = 0L; + for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { + Path logDir = new Path(rootLogDir, applicationId.toString()); + try { + appLogsSize += lfs.getFileStatus(logDir).getLen(); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Unsupported file system used for log dir {}", logDir, ue); + continue; + } catch (IOException ie) { + LOG.error("Unable to getFileStatus for {}", logDir, ie); + continue; + } + } + return appLogsSize; + } + class LogDeleterRunnable implements Runnable { private String user; private ApplicationId applicationId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index 8d5adf64321..55a201b7fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -596,4 +596,103 @@ public class TestNonAggregatingLogHandler { } } + @Test + public void testLogSizeThresholdDeletion() throws IOException { + ApplicationId anotherAppId = BuilderUtils.newApplicationId(4567, 1); + ContainerId container22 = BuilderUtils.newContainerId(appAttemptId, 2); + String user2 = "test_user2"; + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2); + String localLogDirsString = localLogDirs[0].getAbsolutePath() + "," + + localLogDirs[1].getAbsolutePath(); + + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + conf.setBoolean(YarnConfiguration.NM_LOG_TRIGGER_DELETE_BY_SIZE_ENABLED, true); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 60 * 1000); + conf.set(YarnConfiguration.NM_LOG_DELETE_THRESHOLD, "15g"); + + dirsHandler.init(conf); + + NonAggregatingLogHandler rawLogHandler = + new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler, + new NMNullStateStoreService()); + NonAggregatingLogHandler logHandler = spy(rawLogHandler); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logHandler) + .getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FileStatus fs1 = + new FileStatus(10 * 1024 * 1024 * 1024L, true, 1, 0, + System.currentTimeMillis(), 0, defaultPermission, "", "", + new Path(localLogDirs[0].getAbsolutePath())); + FileStatus fs2 = + new FileStatus(5 * 1024 * 1024 * 1024L, true, 1, 0, + System.currentTimeMillis(), 0, defaultPermission, "", "", + new Path(localLogDirs[0].getAbsolutePath())); + Path path1 = new Path(localLogDirs[0].getAbsolutePath(), appId.toString()); + Path path2 = new Path(localLogDirs[1].getAbsolutePath(), appId.toString()); + Path path3 = new Path(localLogDirs[0].getAbsolutePath(), anotherAppId.toString()); + Path path4 = new Path(localLogDirs[1].getAbsolutePath(), anotherAppId.toString()); + + doReturn(fs1).when(spylfs).getFileStatus(eq(path1)); + doReturn(fs1).when(spylfs).getFileStatus(eq(path2)); + doReturn(fs2).when(spylfs).getFileStatus(eq(path3)); + doReturn(fs2).when(spylfs).getFileStatus(eq(path4)); + + logHandler.init(conf); + logHandler.start(); + + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null)); + + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, + ContainerType.APPLICATION_MASTER, 0)); + + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); + + logHandler.handle(new LogHandlerAppStartedEvent(anotherAppId, user2, + null, null)); + + logHandler.handle(new LogHandlerContainerFinishedEvent(container22, + ContainerType.APPLICATION_MASTER, 0)); + + logHandler.handle(new LogHandlerAppFinishedEvent(anotherAppId)); + + Path[] localAppLogDirs = new Path[]{path1, path2}; + Path[] anotherLocalAppLogDirs = new Path[]{path3, path4}; + + testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs); + testDeletionServiceNeverCall(mockDelService, user2, 5000, anotherLocalAppLogDirs); + + logHandler.close(); + for (int i = 0; i < localLogDirs.length; i++) { + FileUtils.deleteDirectory(localLogDirs[i]); + } + } + + static void testDeletionServiceNeverCall(DeletionService delService, String user, + long timeout, Path... matchPaths) { + long verifyStartTime = System.currentTimeMillis(); + WantedButNotInvoked notInvokedException = null; + boolean matched = false; + while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) { + try { + verify(delService, never()).delete(argThat(new FileDeletionMatcher( + delService, user, null, Arrays.asList(matchPaths)))); + matched = true; + } catch (WantedButNotInvoked e) { + notInvokedException = e; + try { + Thread.sleep(50l); + } catch (InterruptedException i) { + } + } + } + if (!matched) { + throw notInvokedException; + } + return; + } }