From 75bc6cfced30bb1a186aebbe9990744a20d7ff0e Mon Sep 17 00:00:00 2001 From: 9uapaw Date: Tue, 14 Jun 2022 16:14:08 +0200 Subject: [PATCH] YARN-11176. Refactor TestAggregatedLogDeletionService. Contributed by Szilard Nemeth. --- .../TestAggregatedLogDeletionService.java | 684 ++++++++---------- 1 file changed, 305 insertions(+), 379 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index daa2fc6b01c..f855f9181cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -44,156 +44,172 @@ import org.junit.Test; import org.junit.Assert; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; public class TestAggregatedLogDeletionService { - + + private static final String T_FILE = "TFile"; + private static final String USER_ME = "me"; + private static final String DIR_HOST1 = "host1"; + private static final String DIR_HOST2 = "host2"; + + private static final String ROOT = "mockfs://foo/"; + private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs"; + private static final String SUFFIX = "logs"; + private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX; + private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600; + + private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir, + ApplicationId appId, + String user, String suffix, + long modificationTime) { + Path path = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogDir, appId, user, suffix); + FileStatus fileStatus = createEmptyFileStatus(modificationTime, path); + return new PathWithFileStatus(path, fileStatus); + } + + private static FileStatus createEmptyFileStatus(long modificationTime, Path path) { + return new FileStatus(0, true, 0, 0, modificationTime, path); + } + + private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir, + long modificationTime) { + Path logPath = new Path(baseDir, childDir); + FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath); + return new PathWithFileStatus(logPath, fStatus); + } + + private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir, + long modificationTime) { + Path logPath = new Path(baseDir, childDir); + FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath); + return new PathWithFileStatus(logPath, fStatus); + } + + private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath, + String user, + String suffix, + ApplicationId appId, + long modificationTime) { + Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId); + FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir); + return new PathWithFileStatus(bucketDir, fStatus); + } + + private static FileStatus createFileStatusWithLengthForFile(long length, + long modificationTime, + Path logPath) { + return new FileStatus(length, false, 1, 1, modificationTime, logPath); + } + + private static FileStatus createFileStatusWithLengthForDir(long length, + long modificationTime, + Path logPath) { + return new FileStatus(length, true, 1, 1, modificationTime, logPath); + } + @Before public void closeFilesystems() throws IOException { // prevent the same mockfs instance from being reused due to FS cache FileSystem.closeAll(); } + private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) { + Configuration conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds); + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + retainCheckIntervalSeconds); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE), + LogAggregationTFileController.class.getName()); + return conf; + } + @Test public void testDeletion() throws Exception { long now = System.currentTimeMillis(); - long toDeleteTime = now - (2000*1000); - long toKeepTime = now - (1500*1000); - String root = "mockfs://foo/"; - String remoteRootLogDir = root+"tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - final Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); + long toDeleteTime = now - (2000 * 1000); + long toKeepTime = now - (1500 * 1000); + Configuration conf = setupConfiguration(1800, -1); - Path rootPath = new Path(root); + Path rootPath = new Path(ROOT); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); - Path remoteRootLogPath = new Path(remoteRootLogDir); - - Path userDir = new Path(remoteRootLogPath, "me"); - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir); - - when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[]{userDirStatus}); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, + toKeepTime); - ApplicationId appId1 = - ApplicationId.newInstance(now, 1); - Path suffixDir = new Path(userDir, newSuffix); - FileStatus suffixDirStatus = new FileStatus(0, true, - 0, 0, toDeleteTime, suffixDir); - Path bucketDir = LogAggregationUtils.getRemoteBucketDir( - remoteRootLogPath, "me", suffix, appId1); - FileStatus bucketDirStatus = new FileStatus(0, true, 0, - 0, toDeleteTime, bucketDir); - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId1, "me", suffix); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, - toDeleteTime, app1Dir); - - ApplicationId appId2 = - ApplicationId.newInstance(now, 2); - Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId2, "me", suffix); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, - toDeleteTime, app2Dir); - - ApplicationId appId3 = - ApplicationId.newInstance(now, 3); - Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId3, "me", suffix); - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, - toDeleteTime, app3Dir); - - ApplicationId appId4 = - ApplicationId.newInstance(now, 4); - Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId4, "me", suffix); - FileStatus app4DirStatus = - new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus}); - when(mockFs.listStatus(userDir)).thenReturn( - new FileStatus[] {suffixDirStatus}); - when(mockFs.listStatus(suffixDir)).thenReturn( - new FileStatus[] {bucketDirStatus}); - when(mockFs.listStatus(bucketDir)).thenReturn( - new FileStatus[] {app1DirStatus, app2DirStatus, - app3DirStatus, app4DirStatus}); - - when(mockFs.listStatus(app1Dir)).thenReturn( - new FileStatus[]{}); + ApplicationId appId1 = ApplicationId.newInstance(now, 1); + ApplicationId appId2 = ApplicationId.newInstance(now, 2); + ApplicationId appId3 = ApplicationId.newInstance(now, 3); + ApplicationId appId4 = ApplicationId.newInstance(now, 4); + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, + toDeleteTime); + PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX, + toDeleteTime); - Path app2Log1 = new Path(app2Dir, "host1"); - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1); - - Path app2Log2 = new Path(app2Dir, "host2"); - FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2); - - when(mockFs.listStatus(app2Dir)).thenReturn( - new FileStatus[]{app2Log1Status, app2Log2Status}); - - Path app3Log1 = new Path(app3Dir, "host1"); - FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1); - - Path app3Log2 = new Path(app3Dir, "host2"); - FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2); - - when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :(")); - - when(mockFs.listStatus(app3Dir)).thenReturn( - new FileStatus[]{app3Log1Status, app3Log2Status}); - - Path app4Log1 = new Path(app4Dir, "host1"); - FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1); - - Path app4Log2 = new Path(app4Dir, "host2"); - FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, - toKeepTime, app4Log2); + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1, + USER_ME, SUFFIX, toDeleteTime); + PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2, + USER_ME, SUFFIX, toDeleteTime); + PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3, + USER_ME, SUFFIX, toDeleteTime); + PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4, + USER_ME, SUFFIX, toDeleteTime); - when(mockFs.listStatus(app4Dir)).thenReturn( - new FileStatus[]{app4Log1Status, app4Log2Status}); + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus}); + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus}); + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] { + app1.fileStatus, app2.fileStatus, app3.fileStatus, app4.fileStatus}); + + PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1, + toDeleteTime); + PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2, toKeepTime); + PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1, + toDeleteTime); + PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2, + toDeleteTime); + PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1, + toDeleteTime); + PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime); - final List finishedApplications = - Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3)); - final List runningApplications = - Collections.unmodifiableList(Arrays.asList(appId4)); + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{}); + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus, + app2Log2.fileStatus}); + when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus, + app3Log2.fileStatus}); + when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus, + app4Log2.fileStatus}); + when(mockFs.delete(app3.path, true)).thenThrow( + new AccessControlException("Injected Error\nStack Trace :(")); + + final List finishedApplications = Collections.unmodifiableList( + Arrays.asList(appId1, appId2, appId3)); + final List runningApplications = Collections.singletonList(appId4); AggregatedLogDeletionService deletionService = - new AggregatedLogDeletionService() { - @Override - protected ApplicationClientProtocol createRMClient() - throws IOException { - try { - return createMockRMClient(finishedApplications, - runningApplications); - } catch (Exception e) { - throw new IOException(e); - } - } - @Override - protected void stopRMClient() { - // DO NOTHING - } - }; + new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications); deletionService.init(conf); deletionService.start(); - verify(mockFs, timeout(2000)).delete(app1Dir, true); - verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true); - verify(mockFs, timeout(2000)).delete(app3Dir, true); - verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true); - verify(mockFs, timeout(2000)).delete(app4Log1, true); - verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true); + int timeout = 2000; + verify(mockFs, timeout(timeout)).delete(app1.path, true); + verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true); + verify(mockFs, timeout(timeout)).delete(app3.path, true); + verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true); + verify(mockFs, timeout(timeout)).delete(app4Log1.path, true); + verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true); deletionService.stop(); } @@ -201,357 +217,216 @@ protected void stopRMClient() { @Test public void testRefreshLogRetentionSettings() throws Exception { long now = System.currentTimeMillis(); - //time before 2000 sec long before2000Secs = now - (2000 * 1000); - //time before 50 sec long before50Secs = now - (50 * 1000); - String root = "mockfs://foo/"; - String remoteRootLogDir = root + "tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - final Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, - "1"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); + int checkIntervalSeconds = 2; + int checkIntervalMilliSeconds = checkIntervalSeconds * 1000; + Configuration conf = setupConfiguration(1800, 1); - Path rootPath = new Path(root); + Path rootPath = new Path(ROOT); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem(); - Path remoteRootLogPath = new Path(remoteRootLogDir); + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path userDir = new Path(remoteRootLogPath, "me"); - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs, - userDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); - when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[] { userDirStatus }); + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, + before50Secs); + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, + before50Secs); + PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath, + USER_ME, SUFFIX, appId1, before50Secs); - Path suffixDir = new Path(userDir, newSuffix); - FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs, - suffixDir); + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus}); - ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId1, "me", suffix); - Path bucketDir = LogAggregationUtils.getRemoteBucketDir( - remoteRootLogPath, "me", suffix, appId1); - FileStatus bucketDirStatus = new FileStatus(0, true, 0, - 0, before50Secs, bucketDir); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, - app1Dir); - - ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - //Set time last modified of app1Dir directory and its files to before50Secs - Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId2, "me", suffix); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, - app2Dir); + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1, + USER_ME, SUFFIX, before2000Secs); - when(mockFs.listStatus(userDir)).thenReturn( - new FileStatus[] {suffixStatus }); - when(mockFs.listStatus(suffixDir)).thenReturn( - new FileStatus[] {bucketDirStatus }); - when(mockFs.listStatus(bucketDir)).thenReturn( - new FileStatus[] {app1DirStatus, app2DirStatus }); + //Set time last modified of app1Dir directory and its files to before50Secs + PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2, + USER_ME, SUFFIX, before50Secs); - Path app1Log1 = new Path(app1Dir, "host1"); - FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs, - app1Log1); + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus}); + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus}); + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus, + app2.fileStatus}); - when(mockFs.listStatus(app1Dir)).thenReturn( - new FileStatus[] { app1Log1Status }); + PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, + before2000Secs); + PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1, + before50Secs); - Path app2Log1 = new Path(app2Dir, "host1"); - FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs, - app2Log1); - - when(mockFs.listStatus(app2Dir)).thenReturn( - new FileStatus[] { app2Log1Status }); + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus}); + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus}); final List finishedApplications = Collections.unmodifiableList(Arrays.asList(appId1, appId2)); - AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { - @Override - protected Configuration createConf() { - return conf; - } - @Override - protected ApplicationClientProtocol createRMClient() - throws IOException { - try { - return createMockRMClient(finishedApplications, null); - } catch (Exception e) { - throw new IOException(e); - } - } - @Override - protected void stopRMClient() { - // DO NOTHING - } - }; + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null, + finishedApplications, conf); deletionSvc.init(conf); deletionSvc.start(); //app1Dir would be deleted since its done above log retention period - verify(mockFs, timeout(10000)).delete(app1Dir, true); - //app2Dir is not expected to be deleted since its below the threshold - verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true); + verify(mockFs, timeout(10000)).delete(app1.path, true); + //app2Dir is not expected to be deleted since it is below the threshold + verify(mockFs, timeout(3000).times(0)).delete(app2.path, true); - //Now,lets change the confs - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, - "2"); + //Now, let's change the confs + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50); + conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, + checkIntervalSeconds); //We have not called refreshLogSettings,hence don't expect to see the changed conf values - Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs()); + assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs()); //refresh the log settings deletionSvc.refreshLogRetentionSettings(); //Check interval time should reflect the new value - Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs()); + Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs()); //app2Dir should be deleted since it falls above the threshold - verify(mockFs, timeout(10000)).delete(app2Dir, true); + verify(mockFs, timeout(10000)).delete(app2.path, true); deletionSvc.stop(); } @Test public void testCheckInterval() throws Exception { - long RETENTION_SECS = 10 * 24 * 3600; long now = System.currentTimeMillis(); - long toDeleteTime = now - RETENTION_SECS*1000; - - String root = "mockfs://foo/"; - String remoteRootLogDir = root+"tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); + long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000; + Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); - Path rootPath = new Path(root); + Path rootPath = new Path(ROOT); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); - Path remoteRootLogPath = new Path(remoteRootLogDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); - Path userDir = new Path(remoteRootLogPath, "me"); - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir); + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now); + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now); - when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[]{userDirStatus}); + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus}); - ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path suffixDir = new Path(userDir, newSuffix); - FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now, - suffixDir); - Path bucketDir = LogAggregationUtils.getRemoteBucketDir( - remoteRootLogPath, "me", suffix, appId1); - Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId1, "me", suffix); - FileStatus bucketDirStatus = new FileStatus(0, true, 0, - 0, now, bucketDir); + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); + PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath, + USER_ME, SUFFIX, appId1, now); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir); + PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1, + USER_ME, SUFFIX, now); + PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now); - when(mockFs.listStatus(userDir)).thenReturn( - new FileStatus[] {suffixDirStatus}); - when(mockFs.listStatus(suffixDir)).thenReturn( - new FileStatus[] {bucketDirStatus}); - when(mockFs.listStatus(bucketDir)).thenReturn( - new FileStatus[] {app1DirStatus}); + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus}); + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus}); + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus}); + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus}); - Path app1Log1 = new Path(app1Dir, "host1"); - FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1); + final List finishedApplications = Collections.singletonList(appId1); - when(mockFs.listStatus(app1Dir)).thenReturn( - new FileStatus[]{app1Log1Status}); - - final List finishedApplications = - Collections.unmodifiableList(Arrays.asList(appId1)); - - AggregatedLogDeletionService deletionSvc = - new AggregatedLogDeletionService() { - @Override - protected ApplicationClientProtocol createRMClient() - throws IOException { - try { - return createMockRMClient(finishedApplications, null); - } catch (Exception e) { - throw new IOException(e); - } - } - @Override - protected void stopRMClient() { - // DO NOTHING - } - }; + AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null, + finishedApplications); deletionSvc.init(conf); deletionSvc.start(); verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class)); - verify(mockFs, never()).delete(app1Dir, true); + verify(mockFs, never()).delete(app1.path, true); - // modify the timestamp of the logs and verify it's picked up quickly - bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir); - app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); - app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1); - when(mockFs.listStatus(userDir)).thenReturn( - new FileStatus[] {suffixDirStatus}); - when(mockFs.listStatus(suffixDir)).thenReturn( - new FileStatus[] {bucketDirStatus }); - when(mockFs.listStatus(bucketDir)).thenReturn( - new FileStatus[] {app1DirStatus }); - when(mockFs.listStatus(app1Dir)).thenReturn( - new FileStatus[]{app1Log1Status}); + // modify the timestamp of the logs and verify if it's picked up quickly + app1.changeModificationTime(toDeleteTime); + app1Log1.changeModificationTime(toDeleteTime); + bucketDir.changeModificationTime(toDeleteTime); + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus}); + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus }); + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus }); + when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus}); - verify(mockFs, timeout(10000)).delete(app1Dir, true); + verify(mockFs, timeout(10000)).delete(app1.path, true); deletionSvc.stop(); } @Test public void testRobustLogDeletion() throws Exception { - final long RETENTION_SECS = 10 * 24 * 3600; - - String root = "mockfs://foo/"; - String remoteRootLogDir = root+"tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, - FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, - "1"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); + Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); - Path rootPath = new Path(root); + Path rootPath = new Path(ROOT); FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); - Path remoteRootLogPath = new Path(remoteRootLogDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); - Path userDir = new Path(remoteRootLogPath, "me"); - Path suffixDir = new Path(userDir, newSuffix); - FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir); - FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir); - Path bucketDir = new Path(suffixDir, String.valueOf(0)); - FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir); + PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0); + PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0); + PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0); - when(mockFs.listStatus(remoteRootLogPath)).thenReturn( - new FileStatus[]{userDirStatus}); - when(mockFs.listStatus(userDir)).thenReturn( - new FileStatus[]{suffixStatus}); - when(mockFs.listStatus(suffixDir)).thenReturn( - new FileStatus[]{bucketDirStatus}); + when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus}); + when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus}); + when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus}); - ApplicationId appId1 = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path app1Dir = new Path(bucketDir, appId1.toString()); - FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir); - ApplicationId appId2 = - ApplicationId.newInstance(System.currentTimeMillis(), 2); - Path app2Dir = new Path(bucketDir, "application_a"); - FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir); - ApplicationId appId3 = - ApplicationId.newInstance(System.currentTimeMillis(), 3); - Path app3Dir = new Path(bucketDir, appId3.toString()); - FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir); + ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2); + ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3); - when(mockFs.listStatus(bucketDir)).thenReturn( - new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus}); - when(mockFs.listStatus(app2Dir)).thenReturn( - new FileStatus[]{}); + PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0); + PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0); + PathWithFileStatus app3 = createDirLogPathWithFileStatus(bucketDir.path, appId3.toString(), 0); + PathWithFileStatus app3Log3 = createDirLogPathWithFileStatus(app3.path, DIR_HOST1, 0); - when(mockFs.listStatus(app1Dir)).thenThrow( - new RuntimeException("Should Be Caught and Logged")); - Path app3Log3 = new Path(app3Dir, "host1"); - FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3); - when(mockFs.listStatus(app3Dir)).thenReturn( - new FileStatus[]{app3Log3Status}); + when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{ + app1.fileStatus,app2.fileStatus, app3.fileStatus}); + when(mockFs.listStatus(app1.path)).thenThrow( + new RuntimeException("Should be caught and logged")); + when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{}); + when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log3.fileStatus}); - final List finishedApplications = - Collections.unmodifiableList(Arrays.asList(appId1, appId3)); + final List finishedApplications = Collections.unmodifiableList( + Arrays.asList(appId1, appId3)); - ApplicationClientProtocol rmClient = - createMockRMClient(finishedApplications, null); + ApplicationClientProtocol rmClient = createMockRMClient(finishedApplications, null); AggregatedLogDeletionService.LogDeletionTask deletionTask = - new AggregatedLogDeletionService.LogDeletionTask(conf, - RETENTION_SECS, - rmClient); + new AggregatedLogDeletionService.LogDeletionTask(conf, TEN_DAYS_IN_SECONDS, rmClient); deletionTask.run(); - verify(mockFs).delete(app3Dir, true); + verify(mockFs).delete(app3.path, true); } static class MockFileSystem extends FilterFileSystem { MockFileSystem() { super(mock(FileSystem.class)); } + public void initialize(URI name, Configuration conf) throws IOException {} } private static ApplicationClientProtocol createMockRMClient( - List finishedApplicaitons, + List finishedApplications, List runningApplications) throws Exception { - final ApplicationClientProtocol mockProtocol = - mock(ApplicationClientProtocol.class); - if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) { - for (ApplicationId appId : finishedApplicaitons) { - GetApplicationReportRequest request = - GetApplicationReportRequest.newInstance(appId); - GetApplicationReportResponse response = - createApplicationReportWithFinishedApplication(); - when(mockProtocol.getApplicationReport(request)) - .thenReturn(response); + final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class); + if (finishedApplications != null && !finishedApplications.isEmpty()) { + for (ApplicationId appId : finishedApplications) { + GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = createApplicationReportWithFinishedApplication(); + when(mockProtocol.getApplicationReport(request)).thenReturn(response); } } if (runningApplications != null && !runningApplications.isEmpty()) { for (ApplicationId appId : runningApplications) { - GetApplicationReportRequest request = - GetApplicationReportRequest.newInstance(appId); - GetApplicationReportResponse response = - createApplicationReportWithRunningApplication(); - when(mockProtocol.getApplicationReport(request)) - .thenReturn(response); + GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = createApplicationReportWithRunningApplication(); + when(mockProtocol.getApplicationReport(request)).thenReturn(response); } } return mockProtocol; } - private static GetApplicationReportResponse - createApplicationReportWithRunningApplication() { + private static GetApplicationReportResponse createApplicationReportWithRunningApplication() { ApplicationReport report = mock(ApplicationReport.class); when(report.getYarnApplicationState()).thenReturn( YarnApplicationState.RUNNING); @@ -561,14 +436,65 @@ private static ApplicationClientProtocol createMockRMClient( return response; } - private static GetApplicationReportResponse - createApplicationReportWithFinishedApplication() { + private static GetApplicationReportResponse createApplicationReportWithFinishedApplication() { ApplicationReport report = mock(ApplicationReport.class); - when(report.getYarnApplicationState()).thenReturn( - YarnApplicationState.FINISHED); - GetApplicationReportResponse response = - mock(GetApplicationReportResponse.class); + when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + GetApplicationReportResponse response = mock(GetApplicationReportResponse.class); when(response.getApplicationReport()).thenReturn(report); return response; } + + private static class PathWithFileStatus { + private final Path path; + private FileStatus fileStatus; + + PathWithFileStatus(Path path, FileStatus fileStatus) { + this.path = path; + this.fileStatus = fileStatus; + } + + public void changeModificationTime(long modTime) { + fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(), + fileStatus.getReplication(), + fileStatus.getBlockSize(), modTime, fileStatus.getPath()); + } + } + + private static class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService { + private final List finishedApplications; + private final List runningApplications; + private final Configuration conf; + + AggregatedLogDeletionServiceForTest(List runningApplications, + List finishedApplications) { + this(runningApplications, finishedApplications, null); + } + + AggregatedLogDeletionServiceForTest(List runningApplications, + List finishedApplications, + Configuration conf) { + this.runningApplications = runningApplications; + this.finishedApplications = finishedApplications; + this.conf = conf; + } + + @Override + protected ApplicationClientProtocol createRMClient() throws IOException { + try { + return createMockRMClient(finishedApplications, runningApplications); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected Configuration createConf() { + return conf; + } + + @Override + protected void stopRMClient() { + // DO NOTHING + } + } }