YARN-11176. Refactor TestAggregatedLogDeletionService. Contributed by Szilard Nemeth.
This commit is contained in:
parent
d0715b1024
commit
75bc6cfced
|
@ -44,156 +44,172 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
public class TestAggregatedLogDeletionService {
|
public class TestAggregatedLogDeletionService {
|
||||||
|
|
||||||
|
private static final String T_FILE = "TFile";
|
||||||
|
private static final String USER_ME = "me";
|
||||||
|
private static final String DIR_HOST1 = "host1";
|
||||||
|
private static final String DIR_HOST2 = "host2";
|
||||||
|
|
||||||
|
private static final String ROOT = "mockfs://foo/";
|
||||||
|
private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
|
||||||
|
private static final String SUFFIX = "logs";
|
||||||
|
private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
|
||||||
|
private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
|
||||||
|
|
||||||
|
private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
|
||||||
|
ApplicationId appId,
|
||||||
|
String user, String suffix,
|
||||||
|
long modificationTime) {
|
||||||
|
Path path = LogAggregationUtils.getRemoteAppLogDir(
|
||||||
|
remoteRootLogDir, appId, user, suffix);
|
||||||
|
FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
|
||||||
|
return new PathWithFileStatus(path, fileStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
|
||||||
|
return new FileStatus(0, true, 0, 0, modificationTime, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
|
||||||
|
long modificationTime) {
|
||||||
|
Path logPath = new Path(baseDir, childDir);
|
||||||
|
FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
|
||||||
|
return new PathWithFileStatus(logPath, fStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
|
||||||
|
long modificationTime) {
|
||||||
|
Path logPath = new Path(baseDir, childDir);
|
||||||
|
FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
|
||||||
|
return new PathWithFileStatus(logPath, fStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
|
||||||
|
String user,
|
||||||
|
String suffix,
|
||||||
|
ApplicationId appId,
|
||||||
|
long modificationTime) {
|
||||||
|
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
|
||||||
|
FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
|
||||||
|
return new PathWithFileStatus(bucketDir, fStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileStatus createFileStatusWithLengthForFile(long length,
|
||||||
|
long modificationTime,
|
||||||
|
Path logPath) {
|
||||||
|
return new FileStatus(length, false, 1, 1, modificationTime, logPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileStatus createFileStatusWithLengthForDir(long length,
|
||||||
|
long modificationTime,
|
||||||
|
Path logPath) {
|
||||||
|
return new FileStatus(length, true, 1, 1, modificationTime, logPath);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void closeFilesystems() throws IOException {
|
public void closeFilesystems() throws IOException {
|
||||||
// prevent the same mockfs instance from being reused due to FS cache
|
// prevent the same mockfs instance from being reused due to FS cache
|
||||||
FileSystem.closeAll();
|
FileSystem.closeAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
||||||
|
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||||
|
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
|
||||||
|
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||||
|
retainCheckIntervalSeconds);
|
||||||
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
|
||||||
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
|
||||||
|
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
|
||||||
|
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
|
||||||
|
LogAggregationTFileController.class.getName());
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeletion() throws Exception {
|
public void testDeletion() throws Exception {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
long toDeleteTime = now - (2000*1000);
|
long toDeleteTime = now - (2000 * 1000);
|
||||||
long toKeepTime = now - (1500*1000);
|
long toKeepTime = now - (1500 * 1000);
|
||||||
String root = "mockfs://foo/";
|
|
||||||
String remoteRootLogDir = root+"tmp/logs";
|
|
||||||
String suffix = "logs";
|
|
||||||
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
|
|
||||||
final Configuration conf = new Configuration();
|
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
||||||
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
||||||
LogAggregationTFileController.class.getName());
|
|
||||||
|
|
||||||
|
Configuration conf = setupConfiguration(1800, -1);
|
||||||
|
|
||||||
Path rootPath = new Path(root);
|
Path rootPath = new Path(ROOT);
|
||||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||||
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||||
|
|
||||||
Path remoteRootLogPath = new Path(remoteRootLogDir);
|
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||||
|
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
|
||||||
Path userDir = new Path(remoteRootLogPath, "me");
|
toKeepTime);
|
||||||
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
|
||||||
new FileStatus[]{userDirStatus});
|
|
||||||
|
|
||||||
ApplicationId appId1 =
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
|
||||||
ApplicationId.newInstance(now, 1);
|
|
||||||
Path suffixDir = new Path(userDir, newSuffix);
|
|
||||||
FileStatus suffixDirStatus = new FileStatus(0, true,
|
|
||||||
0, 0, toDeleteTime, suffixDir);
|
|
||||||
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
|
|
||||||
remoteRootLogPath, "me", suffix, appId1);
|
|
||||||
FileStatus bucketDirStatus = new FileStatus(0, true, 0,
|
|
||||||
0, toDeleteTime, bucketDir);
|
|
||||||
Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
remoteRootLogPath, appId1, "me", suffix);
|
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
|
|
||||||
toDeleteTime, app1Dir);
|
|
||||||
|
|
||||||
ApplicationId appId2 =
|
|
||||||
ApplicationId.newInstance(now, 2);
|
|
||||||
Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
remoteRootLogPath, appId2, "me", suffix);
|
|
||||||
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
|
|
||||||
toDeleteTime, app2Dir);
|
|
||||||
|
|
||||||
ApplicationId appId3 =
|
|
||||||
ApplicationId.newInstance(now, 3);
|
|
||||||
Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
remoteRootLogPath, appId3, "me", suffix);
|
|
||||||
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
|
|
||||||
toDeleteTime, app3Dir);
|
|
||||||
|
|
||||||
ApplicationId appId4 =
|
|
||||||
ApplicationId.newInstance(now, 4);
|
|
||||||
Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
remoteRootLogPath, appId4, "me", suffix);
|
|
||||||
FileStatus app4DirStatus =
|
|
||||||
new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(userDir)).thenReturn(
|
ApplicationId appId1 = ApplicationId.newInstance(now, 1);
|
||||||
new FileStatus[] {suffixDirStatus});
|
ApplicationId appId2 = ApplicationId.newInstance(now, 2);
|
||||||
when(mockFs.listStatus(suffixDir)).thenReturn(
|
ApplicationId appId3 = ApplicationId.newInstance(now, 3);
|
||||||
new FileStatus[] {bucketDirStatus});
|
ApplicationId appId4 = ApplicationId.newInstance(now, 4);
|
||||||
when(mockFs.listStatus(bucketDir)).thenReturn(
|
|
||||||
new FileStatus[] {app1DirStatus, app2DirStatus,
|
|
||||||
app3DirStatus, app4DirStatus});
|
|
||||||
|
|
||||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
|
||||||
new FileStatus[]{});
|
|
||||||
|
|
||||||
|
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
|
||||||
|
toDeleteTime);
|
||||||
|
PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
|
||||||
|
toDeleteTime);
|
||||||
|
|
||||||
Path app2Log1 = new Path(app2Dir, "host1");
|
PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
|
||||||
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
|
USER_ME, SUFFIX, toDeleteTime);
|
||||||
|
PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
|
||||||
Path app2Log2 = new Path(app2Dir, "host2");
|
USER_ME, SUFFIX, toDeleteTime);
|
||||||
FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
|
PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
|
||||||
|
USER_ME, SUFFIX, toDeleteTime);
|
||||||
when(mockFs.listStatus(app2Dir)).thenReturn(
|
PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
|
||||||
new FileStatus[]{app2Log1Status, app2Log2Status});
|
USER_ME, SUFFIX, toDeleteTime);
|
||||||
|
|
||||||
Path app3Log1 = new Path(app3Dir, "host1");
|
|
||||||
FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
|
|
||||||
|
|
||||||
Path app3Log2 = new Path(app3Dir, "host2");
|
|
||||||
FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
|
|
||||||
|
|
||||||
when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
|
|
||||||
|
|
||||||
when(mockFs.listStatus(app3Dir)).thenReturn(
|
|
||||||
new FileStatus[]{app3Log1Status, app3Log2Status});
|
|
||||||
|
|
||||||
Path app4Log1 = new Path(app4Dir, "host1");
|
|
||||||
FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
|
|
||||||
|
|
||||||
Path app4Log2 = new Path(app4Dir, "host2");
|
|
||||||
FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
|
|
||||||
toKeepTime, app4Log2);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(app4Dir)).thenReturn(
|
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
|
||||||
new FileStatus[]{app4Log1Status, app4Log2Status});
|
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
|
||||||
|
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
|
||||||
|
app1.fileStatus, app2.fileStatus, app3.fileStatus, app4.fileStatus});
|
||||||
|
|
||||||
|
PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
|
||||||
|
toDeleteTime);
|
||||||
|
PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2, toKeepTime);
|
||||||
|
PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
|
||||||
|
toDeleteTime);
|
||||||
|
PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
|
||||||
|
toDeleteTime);
|
||||||
|
PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
|
||||||
|
toDeleteTime);
|
||||||
|
PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
|
||||||
|
|
||||||
final List<ApplicationId> finishedApplications =
|
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
|
||||||
Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
|
when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
|
||||||
final List<ApplicationId> runningApplications =
|
app2Log2.fileStatus});
|
||||||
Collections.unmodifiableList(Arrays.asList(appId4));
|
when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
|
||||||
|
app3Log2.fileStatus});
|
||||||
|
when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
|
||||||
|
app4Log2.fileStatus});
|
||||||
|
when(mockFs.delete(app3.path, true)).thenThrow(
|
||||||
|
new AccessControlException("Injected Error\nStack Trace :("));
|
||||||
|
|
||||||
|
final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
|
||||||
|
Arrays.asList(appId1, appId2, appId3));
|
||||||
|
final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
|
||||||
|
|
||||||
AggregatedLogDeletionService deletionService =
|
AggregatedLogDeletionService deletionService =
|
||||||
new AggregatedLogDeletionService() {
|
new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
|
||||||
@Override
|
|
||||||
protected ApplicationClientProtocol createRMClient()
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
return createMockRMClient(finishedApplications,
|
|
||||||
runningApplications);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
protected void stopRMClient() {
|
|
||||||
// DO NOTHING
|
|
||||||
}
|
|
||||||
};
|
|
||||||
deletionService.init(conf);
|
deletionService.init(conf);
|
||||||
deletionService.start();
|
deletionService.start();
|
||||||
|
|
||||||
verify(mockFs, timeout(2000)).delete(app1Dir, true);
|
int timeout = 2000;
|
||||||
verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
|
verify(mockFs, timeout(timeout)).delete(app1.path, true);
|
||||||
verify(mockFs, timeout(2000)).delete(app3Dir, true);
|
verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
|
||||||
verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
|
verify(mockFs, timeout(timeout)).delete(app3.path, true);
|
||||||
verify(mockFs, timeout(2000)).delete(app4Log1, true);
|
verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
|
||||||
verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
|
verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
|
||||||
|
verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
|
||||||
|
|
||||||
deletionService.stop();
|
deletionService.stop();
|
||||||
}
|
}
|
||||||
|
@ -201,357 +217,216 @@ public class TestAggregatedLogDeletionService {
|
||||||
@Test
|
@Test
|
||||||
public void testRefreshLogRetentionSettings() throws Exception {
|
public void testRefreshLogRetentionSettings() throws Exception {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
//time before 2000 sec
|
|
||||||
long before2000Secs = now - (2000 * 1000);
|
long before2000Secs = now - (2000 * 1000);
|
||||||
//time before 50 sec
|
|
||||||
long before50Secs = now - (50 * 1000);
|
long before50Secs = now - (50 * 1000);
|
||||||
String root = "mockfs://foo/";
|
int checkIntervalSeconds = 2;
|
||||||
String remoteRootLogDir = root + "tmp/logs";
|
int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
|
||||||
String suffix = "logs";
|
|
||||||
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
|
|
||||||
final Configuration conf = new Configuration();
|
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
|
||||||
"1");
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
||||||
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
||||||
LogAggregationTFileController.class.getName());
|
|
||||||
|
|
||||||
|
Configuration conf = setupConfiguration(1800, 1);
|
||||||
|
|
||||||
Path rootPath = new Path(root);
|
Path rootPath = new Path(ROOT);
|
||||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||||
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
|
FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
|
||||||
|
|
||||||
Path remoteRootLogPath = new Path(remoteRootLogDir);
|
ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
|
ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
||||||
|
|
||||||
Path userDir = new Path(remoteRootLogPath, "me");
|
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||||
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
|
||||||
userDir);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
|
||||||
new FileStatus[] { userDirStatus });
|
before50Secs);
|
||||||
|
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
|
||||||
|
before50Secs);
|
||||||
|
PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
|
||||||
|
USER_ME, SUFFIX, appId1, before50Secs);
|
||||||
|
|
||||||
Path suffixDir = new Path(userDir, newSuffix);
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
|
||||||
FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
|
||||||
suffixDir);
|
|
||||||
|
|
||||||
ApplicationId appId1 =
|
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
|
||||||
//Set time last modified of app1Dir directory and its files to before2000Secs
|
//Set time last modified of app1Dir directory and its files to before2000Secs
|
||||||
Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
|
PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
|
||||||
remoteRootLogPath, appId1, "me", suffix);
|
USER_ME, SUFFIX, before2000Secs);
|
||||||
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
|
|
||||||
remoteRootLogPath, "me", suffix, appId1);
|
|
||||||
FileStatus bucketDirStatus = new FileStatus(0, true, 0,
|
|
||||||
0, before50Secs, bucketDir);
|
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
|
|
||||||
app1Dir);
|
|
||||||
|
|
||||||
ApplicationId appId2 =
|
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
|
||||||
//Set time last modified of app1Dir directory and its files to before50Secs
|
|
||||||
Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
remoteRootLogPath, appId2, "me", suffix);
|
|
||||||
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
|
|
||||||
app2Dir);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(userDir)).thenReturn(
|
//Set time last modified of app1Dir directory and its files to before50Secs
|
||||||
new FileStatus[] {suffixStatus });
|
PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
|
||||||
when(mockFs.listStatus(suffixDir)).thenReturn(
|
USER_ME, SUFFIX, before50Secs);
|
||||||
new FileStatus[] {bucketDirStatus });
|
|
||||||
when(mockFs.listStatus(bucketDir)).thenReturn(
|
|
||||||
new FileStatus[] {app1DirStatus, app2DirStatus });
|
|
||||||
|
|
||||||
Path app1Log1 = new Path(app1Dir, "host1");
|
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
|
||||||
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
|
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
|
||||||
app1Log1);
|
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
|
||||||
|
app2.fileStatus});
|
||||||
|
|
||||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
|
||||||
new FileStatus[] { app1Log1Status });
|
before2000Secs);
|
||||||
|
PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
|
||||||
|
before50Secs);
|
||||||
|
|
||||||
Path app2Log1 = new Path(app2Dir, "host1");
|
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
|
||||||
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
|
when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
|
||||||
app2Log1);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(app2Dir)).thenReturn(
|
|
||||||
new FileStatus[] { app2Log1Status });
|
|
||||||
|
|
||||||
final List<ApplicationId> finishedApplications =
|
final List<ApplicationId> finishedApplications =
|
||||||
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
|
Collections.unmodifiableList(Arrays.asList(appId1, appId2));
|
||||||
|
|
||||||
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
|
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
|
||||||
@Override
|
finishedApplications, conf);
|
||||||
protected Configuration createConf() {
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
protected ApplicationClientProtocol createRMClient()
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
return createMockRMClient(finishedApplications, null);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
protected void stopRMClient() {
|
|
||||||
// DO NOTHING
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
deletionSvc.init(conf);
|
deletionSvc.init(conf);
|
||||||
deletionSvc.start();
|
deletionSvc.start();
|
||||||
|
|
||||||
//app1Dir would be deleted since its done above log retention period
|
//app1Dir would be deleted since its done above log retention period
|
||||||
verify(mockFs, timeout(10000)).delete(app1Dir, true);
|
verify(mockFs, timeout(10000)).delete(app1.path, true);
|
||||||
//app2Dir is not expected to be deleted since its below the threshold
|
//app2Dir is not expected to be deleted since it is below the threshold
|
||||||
verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
|
verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
|
||||||
|
|
||||||
//Now,lets change the confs
|
//Now, let's change the confs
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
|
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
||||||
"2");
|
checkIntervalSeconds);
|
||||||
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
|
//We have not called refreshLogSettings,hence don't expect to see the changed conf values
|
||||||
Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
|
assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
|
||||||
|
|
||||||
//refresh the log settings
|
//refresh the log settings
|
||||||
deletionSvc.refreshLogRetentionSettings();
|
deletionSvc.refreshLogRetentionSettings();
|
||||||
|
|
||||||
//Check interval time should reflect the new value
|
//Check interval time should reflect the new value
|
||||||
Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
|
Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
|
||||||
//app2Dir should be deleted since it falls above the threshold
|
//app2Dir should be deleted since it falls above the threshold
|
||||||
verify(mockFs, timeout(10000)).delete(app2Dir, true);
|
verify(mockFs, timeout(10000)).delete(app2.path, true);
|
||||||
deletionSvc.stop();
|
deletionSvc.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckInterval() throws Exception {
|
public void testCheckInterval() throws Exception {
|
||||||
long RETENTION_SECS = 10 * 24 * 3600;
|
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
long toDeleteTime = now - RETENTION_SECS*1000;
|
long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
|
||||||
|
|
||||||
String root = "mockfs://foo/";
|
|
||||||
String remoteRootLogDir = root+"tmp/logs";
|
|
||||||
String suffix = "logs";
|
|
||||||
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
||||||
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
||||||
LogAggregationTFileController.class.getName());
|
|
||||||
|
|
||||||
|
Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
|
||||||
|
|
||||||
// prevent us from picking up the same mockfs instance from another test
|
// prevent us from picking up the same mockfs instance from another test
|
||||||
FileSystem.closeAll();
|
FileSystem.closeAll();
|
||||||
Path rootPath = new Path(root);
|
Path rootPath = new Path(ROOT);
|
||||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||||
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||||
|
|
||||||
Path remoteRootLogPath = new Path(remoteRootLogDir);
|
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||||
|
|
||||||
Path userDir = new Path(remoteRootLogPath, "me");
|
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
|
||||||
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
|
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
|
||||||
|
|
||||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
|
||||||
new FileStatus[]{userDirStatus});
|
|
||||||
|
|
||||||
ApplicationId appId1 =
|
ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
|
||||||
Path suffixDir = new Path(userDir, newSuffix);
|
USER_ME, SUFFIX, appId1, now);
|
||||||
FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
|
|
||||||
suffixDir);
|
|
||||||
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
|
|
||||||
remoteRootLogPath, "me", suffix, appId1);
|
|
||||||
Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
|
|
||||||
remoteRootLogPath, appId1, "me", suffix);
|
|
||||||
FileStatus bucketDirStatus = new FileStatus(0, true, 0,
|
|
||||||
0, now, bucketDir);
|
|
||||||
|
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
|
PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
|
||||||
|
USER_ME, SUFFIX, now);
|
||||||
|
PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
|
||||||
|
|
||||||
when(mockFs.listStatus(userDir)).thenReturn(
|
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
|
||||||
new FileStatus[] {suffixDirStatus});
|
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
|
||||||
when(mockFs.listStatus(suffixDir)).thenReturn(
|
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
|
||||||
new FileStatus[] {bucketDirStatus});
|
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
|
||||||
when(mockFs.listStatus(bucketDir)).thenReturn(
|
|
||||||
new FileStatus[] {app1DirStatus});
|
|
||||||
|
|
||||||
Path app1Log1 = new Path(app1Dir, "host1");
|
final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
|
||||||
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
|
||||||
new FileStatus[]{app1Log1Status});
|
finishedApplications);
|
||||||
|
|
||||||
final List<ApplicationId> finishedApplications =
|
|
||||||
Collections.unmodifiableList(Arrays.asList(appId1));
|
|
||||||
|
|
||||||
AggregatedLogDeletionService deletionSvc =
|
|
||||||
new AggregatedLogDeletionService() {
|
|
||||||
@Override
|
|
||||||
protected ApplicationClientProtocol createRMClient()
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
return createMockRMClient(finishedApplications, null);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
protected void stopRMClient() {
|
|
||||||
// DO NOTHING
|
|
||||||
}
|
|
||||||
};
|
|
||||||
deletionSvc.init(conf);
|
deletionSvc.init(conf);
|
||||||
deletionSvc.start();
|
deletionSvc.start();
|
||||||
|
|
||||||
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
|
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
|
||||||
verify(mockFs, never()).delete(app1Dir, true);
|
verify(mockFs, never()).delete(app1.path, true);
|
||||||
|
|
||||||
// modify the timestamp of the logs and verify it's picked up quickly
|
// modify the timestamp of the logs and verify if it's picked up quickly
|
||||||
bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
|
app1.changeModificationTime(toDeleteTime);
|
||||||
app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
|
app1Log1.changeModificationTime(toDeleteTime);
|
||||||
app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
|
bucketDir.changeModificationTime(toDeleteTime);
|
||||||
when(mockFs.listStatus(userDir)).thenReturn(
|
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
|
||||||
new FileStatus[] {suffixDirStatus});
|
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
|
||||||
when(mockFs.listStatus(suffixDir)).thenReturn(
|
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
|
||||||
new FileStatus[] {bucketDirStatus });
|
when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
|
||||||
when(mockFs.listStatus(bucketDir)).thenReturn(
|
|
||||||
new FileStatus[] {app1DirStatus });
|
|
||||||
when(mockFs.listStatus(app1Dir)).thenReturn(
|
|
||||||
new FileStatus[]{app1Log1Status});
|
|
||||||
|
|
||||||
verify(mockFs, timeout(10000)).delete(app1Dir, true);
|
verify(mockFs, timeout(10000)).delete(app1.path, true);
|
||||||
|
|
||||||
deletionSvc.stop();
|
deletionSvc.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRobustLogDeletion() throws Exception {
|
public void testRobustLogDeletion() throws Exception {
|
||||||
final long RETENTION_SECS = 10 * 24 * 3600;
|
Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
|
||||||
|
|
||||||
String root = "mockfs://foo/";
|
|
||||||
String remoteRootLogDir = root+"tmp/logs";
|
|
||||||
String suffix = "logs";
|
|
||||||
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.setClass("fs.mockfs.impl", MockFileSystem.class,
|
|
||||||
FileSystem.class);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
|
||||||
"1");
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
||||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
||||||
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
||||||
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
||||||
LogAggregationTFileController.class.getName());
|
|
||||||
|
|
||||||
// prevent us from picking up the same mockfs instance from another test
|
// prevent us from picking up the same mockfs instance from another test
|
||||||
FileSystem.closeAll();
|
FileSystem.closeAll();
|
||||||
Path rootPath = new Path(root);
|
Path rootPath = new Path(ROOT);
|
||||||
FileSystem rootFs = rootPath.getFileSystem(conf);
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
||||||
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
|
||||||
|
|
||||||
Path remoteRootLogPath = new Path(remoteRootLogDir);
|
Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
|
||||||
|
|
||||||
Path userDir = new Path(remoteRootLogPath, "me");
|
PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
|
||||||
Path suffixDir = new Path(userDir, newSuffix);
|
PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
|
||||||
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
|
PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
|
||||||
FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
|
|
||||||
Path bucketDir = new Path(suffixDir, String.valueOf(0));
|
|
||||||
FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
|
||||||
new FileStatus[]{userDirStatus});
|
when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
|
||||||
when(mockFs.listStatus(userDir)).thenReturn(
|
when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
|
||||||
new FileStatus[]{suffixStatus});
|
|
||||||
when(mockFs.listStatus(suffixDir)).thenReturn(
|
|
||||||
new FileStatus[]{bucketDirStatus});
|
|
||||||
|
|
||||||
ApplicationId appId1 =
|
ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
||||||
Path app1Dir = new Path(bucketDir, appId1.toString());
|
ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
|
||||||
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
|
|
||||||
ApplicationId appId2 =
|
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 2);
|
|
||||||
Path app2Dir = new Path(bucketDir, "application_a");
|
|
||||||
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
|
|
||||||
ApplicationId appId3 =
|
|
||||||
ApplicationId.newInstance(System.currentTimeMillis(), 3);
|
|
||||||
Path app3Dir = new Path(bucketDir, appId3.toString());
|
|
||||||
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
|
|
||||||
|
|
||||||
when(mockFs.listStatus(bucketDir)).thenReturn(
|
PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
|
||||||
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
|
PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);
|
||||||
when(mockFs.listStatus(app2Dir)).thenReturn(
|
PathWithFileStatus app3 = createDirLogPathWithFileStatus(bucketDir.path, appId3.toString(), 0);
|
||||||
new FileStatus[]{});
|
PathWithFileStatus app3Log3 = createDirLogPathWithFileStatus(app3.path, DIR_HOST1, 0);
|
||||||
|
|
||||||
when(mockFs.listStatus(app1Dir)).thenThrow(
|
when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{
|
||||||
new RuntimeException("Should Be Caught and Logged"));
|
app1.fileStatus,app2.fileStatus, app3.fileStatus});
|
||||||
Path app3Log3 = new Path(app3Dir, "host1");
|
when(mockFs.listStatus(app1.path)).thenThrow(
|
||||||
FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
|
new RuntimeException("Should be caught and logged"));
|
||||||
when(mockFs.listStatus(app3Dir)).thenReturn(
|
when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{});
|
||||||
new FileStatus[]{app3Log3Status});
|
when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log3.fileStatus});
|
||||||
|
|
||||||
final List<ApplicationId> finishedApplications =
|
final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
|
||||||
Collections.unmodifiableList(Arrays.asList(appId1, appId3));
|
Arrays.asList(appId1, appId3));
|
||||||
|
|
||||||
ApplicationClientProtocol rmClient =
|
ApplicationClientProtocol rmClient = createMockRMClient(finishedApplications, null);
|
||||||
createMockRMClient(finishedApplications, null);
|
|
||||||
AggregatedLogDeletionService.LogDeletionTask deletionTask =
|
AggregatedLogDeletionService.LogDeletionTask deletionTask =
|
||||||
new AggregatedLogDeletionService.LogDeletionTask(conf,
|
new AggregatedLogDeletionService.LogDeletionTask(conf, TEN_DAYS_IN_SECONDS, rmClient);
|
||||||
RETENTION_SECS,
|
|
||||||
rmClient);
|
|
||||||
deletionTask.run();
|
deletionTask.run();
|
||||||
verify(mockFs).delete(app3Dir, true);
|
verify(mockFs).delete(app3.path, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
static class MockFileSystem extends FilterFileSystem {
|
static class MockFileSystem extends FilterFileSystem {
|
||||||
MockFileSystem() {
|
MockFileSystem() {
|
||||||
super(mock(FileSystem.class));
|
super(mock(FileSystem.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void initialize(URI name, Configuration conf) throws IOException {}
|
public void initialize(URI name, Configuration conf) throws IOException {}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ApplicationClientProtocol createMockRMClient(
|
private static ApplicationClientProtocol createMockRMClient(
|
||||||
List<ApplicationId> finishedApplicaitons,
|
List<ApplicationId> finishedApplications,
|
||||||
List<ApplicationId> runningApplications) throws Exception {
|
List<ApplicationId> runningApplications) throws Exception {
|
||||||
final ApplicationClientProtocol mockProtocol =
|
final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
|
||||||
mock(ApplicationClientProtocol.class);
|
if (finishedApplications != null && !finishedApplications.isEmpty()) {
|
||||||
if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
|
for (ApplicationId appId : finishedApplications) {
|
||||||
for (ApplicationId appId : finishedApplicaitons) {
|
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
|
||||||
GetApplicationReportRequest request =
|
GetApplicationReportResponse response = createApplicationReportWithFinishedApplication();
|
||||||
GetApplicationReportRequest.newInstance(appId);
|
when(mockProtocol.getApplicationReport(request)).thenReturn(response);
|
||||||
GetApplicationReportResponse response =
|
|
||||||
createApplicationReportWithFinishedApplication();
|
|
||||||
when(mockProtocol.getApplicationReport(request))
|
|
||||||
.thenReturn(response);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (runningApplications != null && !runningApplications.isEmpty()) {
|
if (runningApplications != null && !runningApplications.isEmpty()) {
|
||||||
for (ApplicationId appId : runningApplications) {
|
for (ApplicationId appId : runningApplications) {
|
||||||
GetApplicationReportRequest request =
|
GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
|
||||||
GetApplicationReportRequest.newInstance(appId);
|
GetApplicationReportResponse response = createApplicationReportWithRunningApplication();
|
||||||
GetApplicationReportResponse response =
|
when(mockProtocol.getApplicationReport(request)).thenReturn(response);
|
||||||
createApplicationReportWithRunningApplication();
|
|
||||||
when(mockProtocol.getApplicationReport(request))
|
|
||||||
.thenReturn(response);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return mockProtocol;
|
return mockProtocol;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static GetApplicationReportResponse
|
private static GetApplicationReportResponse createApplicationReportWithRunningApplication() {
|
||||||
createApplicationReportWithRunningApplication() {
|
|
||||||
ApplicationReport report = mock(ApplicationReport.class);
|
ApplicationReport report = mock(ApplicationReport.class);
|
||||||
when(report.getYarnApplicationState()).thenReturn(
|
when(report.getYarnApplicationState()).thenReturn(
|
||||||
YarnApplicationState.RUNNING);
|
YarnApplicationState.RUNNING);
|
||||||
|
@ -561,14 +436,65 @@ public class TestAggregatedLogDeletionService {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static GetApplicationReportResponse
|
private static GetApplicationReportResponse createApplicationReportWithFinishedApplication() {
|
||||||
createApplicationReportWithFinishedApplication() {
|
|
||||||
ApplicationReport report = mock(ApplicationReport.class);
|
ApplicationReport report = mock(ApplicationReport.class);
|
||||||
when(report.getYarnApplicationState()).thenReturn(
|
when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
|
||||||
YarnApplicationState.FINISHED);
|
GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
|
||||||
GetApplicationReportResponse response =
|
|
||||||
mock(GetApplicationReportResponse.class);
|
|
||||||
when(response.getApplicationReport()).thenReturn(report);
|
when(response.getApplicationReport()).thenReturn(report);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class PathWithFileStatus {
|
||||||
|
private final Path path;
|
||||||
|
private FileStatus fileStatus;
|
||||||
|
|
||||||
|
PathWithFileStatus(Path path, FileStatus fileStatus) {
|
||||||
|
this.path = path;
|
||||||
|
this.fileStatus = fileStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void changeModificationTime(long modTime) {
|
||||||
|
fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
|
||||||
|
fileStatus.getReplication(),
|
||||||
|
fileStatus.getBlockSize(), modTime, fileStatus.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
|
||||||
|
private final List<ApplicationId> finishedApplications;
|
||||||
|
private final List<ApplicationId> runningApplications;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
|
AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
|
||||||
|
List<ApplicationId> finishedApplications) {
|
||||||
|
this(runningApplications, finishedApplications, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
|
||||||
|
List<ApplicationId> finishedApplications,
|
||||||
|
Configuration conf) {
|
||||||
|
this.runningApplications = runningApplications;
|
||||||
|
this.finishedApplications = finishedApplications;
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ApplicationClientProtocol createRMClient() throws IOException {
|
||||||
|
try {
|
||||||
|
return createMockRMClient(finishedApplications, runningApplications);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void stopRMClient() {
|
||||||
|
// DO NOTHING
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue